Author: rajdavies
Date: Thu Mar 9 03:22:26 2006
New Revision: 384492
URL: http://svn.apache.org/viewcvs?rev=384492&view=rev
Log:
tidied up subscription objectNames
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=384492&r1=384491&r2=384492&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
Thu Mar 9 03:22:26 2006
@@ -22,7 +22,6 @@
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
-
import javax.management.InstanceNotFoundException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
@@ -34,7 +33,6 @@
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
-
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
@@ -62,10 +60,8 @@
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
-
public class ManagedRegionBroker extends RegionBroker{
private static final Log log=LogFactory.getLog(ManagedRegionBroker.class);
private final MBeanServer mbeanServer;
@@ -80,42 +76,37 @@
private final Map inactiveDurableTopicSubscribers=new ConcurrentHashMap();
private final Map temporaryQueueSubscribers=new ConcurrentHashMap();
private final Map temporaryTopicSubscribers=new ConcurrentHashMap();
- private final Map subscriptionKeys = new ConcurrentHashMap();
- private final Map subscriptionMap = new ConcurrentHashMap();
- private final Set registeredMBeans = new CopyOnWriteArraySet();
-
+ private final Map subscriptionKeys=new ConcurrentHashMap();
+ private final Map subscriptionMap=new ConcurrentHashMap();
+ private final Set registeredMBeans=new CopyOnWriteArraySet();
/* This is the first broker in the broker interceptor chain. */
private Broker contextBroker;
public ManagedRegionBroker(BrokerService brokerService,MBeanServer
mbeanServer,ObjectName brokerObjectName,
- TaskRunnerFactory taskRunnerFactory,UsageManager
memoryManager,PersistenceAdapter adapter) throws IOException{
+ TaskRunnerFactory taskRunnerFactory,UsageManager
memoryManager,PersistenceAdapter adapter)
+ throws IOException{
super(brokerService,taskRunnerFactory,memoryManager,adapter);
this.mbeanServer=mbeanServer;
this.brokerObjectName=brokerObjectName;
}
-
- public void start() throws Exception {
+
+ public void start() throws Exception{
super.start();
- //build all existing durable subscriptions
+ // build all existing durable subscriptions
buildExistingSubscriptions();
-
}
-
- protected void doStop(ServiceStopper stopper) {
+ protected void doStop(ServiceStopper stopper){
super.doStop(stopper);
-
// lets remove any mbeans not yet removed
- for (Iterator iter = registeredMBeans.iterator(); iter.hasNext();) {
- ObjectName name = (ObjectName) iter.next();
- try {
+ for(Iterator iter=registeredMBeans.iterator();iter.hasNext();){
+ ObjectName name=(ObjectName) iter.next();
+ try{
mbeanServer.unregisterMBean(name);
- }
- catch (InstanceNotFoundException e) {
- log.warn("The MBean: " + name + " is no longer registered with
JMX");
- }
- catch (Exception e) {
- stopper.onException(this, e);
+ }catch(InstanceNotFoundException e){
+ log.warn("The MBean: "+name+" is no longer registered with
JMX");
+ }catch(Exception e){
+ stopper.onException(this,e);
}
}
registeredMBeans.clear();
@@ -141,12 +132,12 @@
public void register(ActiveMQDestination destName,Destination destination){
try{
- ObjectName objectName = createObjectName(destName);
+ ObjectName objectName=createObjectName(destName);
DestinationView view;
if(destination instanceof Queue){
- view=new QueueView(this, (Queue) destination);
+ view=new QueueView(this,(Queue) destination);
}else{
- view=new TopicView(this, (Topic) destination);
+ view=new TopicView(this,(Topic) destination);
}
registerDestination(objectName,destName,view);
}catch(Exception e){
@@ -156,7 +147,7 @@
public void unregister(ActiveMQDestination destName){
try{
- ObjectName objectName = createObjectName(destName);
+ ObjectName objectName=createObjectName(destName);
unregisterDestination(objectName);
}catch(Exception e){
log.error("Failed to unregister "+destName,e);
@@ -164,32 +155,30 @@
}
public void registerSubscription(ConnectionContext context,Subscription
sub){
- SubscriptionKey key = new
SubscriptionKey(context.getClientId(),sub.getConsumerInfo().getSubcriptionName());
Hashtable map=brokerObjectName.getKeyPropertyList();
- String name = key.toString();
+ String name="";
+ SubscriptionKey key=new
SubscriptionKey(context.getClientId(),sub.getConsumerInfo().getSubcriptionName());
+ if(sub.getConsumerInfo().isDurable()){
+ name=key.toString();
+ }
+
if(sub.getConsumerInfo()!=null&&sub.getConsumerInfo().getConsumerId()!=null){
+ name+="."+sub.getConsumerInfo().getConsumerId();
+ }
try{
-
- ObjectName objectName = new ObjectName(
- brokerObjectName.getDomain()+":"+
- "BrokerName="+map.get("BrokerName")+","+
- "Type=Subscription,"+
- "active=true,"+
- "name="+JMXSupport.encodeObjectNamePart(name)+""
- );
-
+ ObjectName objectName=new
ObjectName(brokerObjectName.getDomain()+":"+"BrokerName="+map.get("BrokerName")
+
+","+"Type=Subscription,"+"active=true,"+"name="+JMXSupport.encodeObjectNamePart(name)+"");
SubscriptionView view;
if(sub.getConsumerInfo().isDurable()){
view=new
DurableSubscriptionView(this,context.getClientId(),sub);
}else{
- if (sub instanceof TopicSubscription) {
- view = new
TopicSubscriptionView(context.getClientId(),(TopicSubscription) sub);
- }
- else {
+ if(sub instanceof TopicSubscription){
+ view=new
TopicSubscriptionView(context.getClientId(),(TopicSubscription) sub);
+ }else{
view=new SubscriptionView(context.getClientId(),sub);
}
}
- subscriptionMap.put(sub,objectName);
registerSubscription(objectName,sub.getConsumerInfo(),key,view);
+ subscriptionMap.put(sub,objectName);
}catch(Exception e){
log.error("Failed to register subscription "+sub,e);
}
@@ -233,7 +222,8 @@
mbeanServer.unregisterMBean(key);
}
- protected void registerSubscription(ObjectName key,ConsumerInfo
info,SubscriptionKey subscriptionKey,SubscriptionView view) throws Exception{
+ protected void registerSubscription(ObjectName key,ConsumerInfo
info,SubscriptionKey subscriptionKey,
+ SubscriptionView view) throws Exception{
ActiveMQDestination dest=info.getDestination();
if(dest.isQueue()){
if(dest.isTemporary()){
@@ -247,16 +237,16 @@
}else{
if(info.isDurable()){
durableTopicSubscribers.put(key,view);
- //unregister any inactive durable subs
- try {
- ObjectName inactiveName = (ObjectName)
subscriptionKeys.get(subscriptionKey);
- if (inactiveName != null){
+ // unregister any inactive durable subs
+ try{
+ ObjectName inactiveName=(ObjectName)
subscriptionKeys.get(subscriptionKey);
+ if(inactiveName!=null){
inactiveDurableTopicSubscribers.remove(inactiveName);
registeredMBeans.remove(inactiveName);
mbeanServer.unregisterMBean(inactiveName);
}
}catch(Exception e){
- log.error("Unable to unregister inactive durable
subscriber: " + subscriptionKey,e);
+ log.error("Unable to unregister inactive durable
subscriber: "+subscriptionKey,e);
}
}else{
topicSubscribers.put(key,view);
@@ -275,72 +265,64 @@
temporaryTopicSubscribers.remove(key);
registeredMBeans.remove(key);
mbeanServer.unregisterMBean(key);
- DurableSubscriptionView view = (DurableSubscriptionView)
durableTopicSubscribers.remove(key);
- if (view != null){
- //need to put this back in the inactive list
- SubscriptionKey subscriptionKey = new
SubscriptionKey(view.getClientId(),view.getSubscriptionName());
- SubscriptionInfo info = new SubscriptionInfo();
+ DurableSubscriptionView view=(DurableSubscriptionView)
durableTopicSubscribers.remove(key);
+ if(view!=null){
+ // need to put this back in the inactive list
+ SubscriptionKey subscriptionKey=new
SubscriptionKey(view.getClientId(),view.getSubscriptionName());
+ SubscriptionInfo info=new SubscriptionInfo();
info.setClientId(subscriptionKey.getClientId());
info.setSubcriptionName(subscriptionKey.getSubscriptionName());
info.setDestination(new ActiveMQTopic(view.getDestinationName()));
- addInactiveSubscription(subscriptionKey, info);
+ addInactiveSubscription(subscriptionKey,info);
}
-
-
}
-
+
protected void buildExistingSubscriptions() throws Exception{
- Map subscriptions = new HashMap();
- Set destinations = adaptor.getDestinations();
- if (destinations != null){
- for (Iterator iter = destinations.iterator(); iter.hasNext();){
- ActiveMQDestination dest = (ActiveMQDestination) iter.next();
- if (dest.isTopic()){
- TopicMessageStore store =
adaptor.createTopicMessageStore((ActiveMQTopic) dest);
- SubscriptionInfo[] infos = store.getAllSubscriptions();
- if (infos != null){
- for (int i = 0; i < infos.length; i++) {
-
- SubscriptionInfo info = infos[i];
+ Map subscriptions=new HashMap();
+ Set destinations=adaptor.getDestinations();
+ if(destinations!=null){
+ for(Iterator iter=destinations.iterator();iter.hasNext();){
+ ActiveMQDestination dest=(ActiveMQDestination) iter.next();
+ if(dest.isTopic()){
+ TopicMessageStore
store=adaptor.createTopicMessageStore((ActiveMQTopic) dest);
+ SubscriptionInfo[] infos=store.getAllSubscriptions();
+ if(infos!=null){
+ for(int i=0;i<infos.length;i++){
+ SubscriptionInfo info=infos[i];
log.debug("Restoring durable subscription:
"+infos);
- SubscriptionKey key = new SubscriptionKey(info);
+ SubscriptionKey key=new SubscriptionKey(info);
subscriptions.put(key,info);
- }
+ }
}
}
}
}
- for (Iterator i = subscriptions.entrySet().iterator();i.hasNext();){
- Map.Entry entry = (Entry) i.next();
- SubscriptionKey key = (SubscriptionKey) entry.getKey();
- SubscriptionInfo info = (SubscriptionInfo) entry.getValue();
- addInactiveSubscription(key, info);
+ for(Iterator i=subscriptions.entrySet().iterator();i.hasNext();){
+ Map.Entry entry=(Entry) i.next();
+ SubscriptionKey key=(SubscriptionKey) entry.getKey();
+ SubscriptionInfo info=(SubscriptionInfo) entry.getValue();
+ addInactiveSubscription(key,info);
}
}
-
+
protected void addInactiveSubscription(SubscriptionKey
key,SubscriptionInfo info){
Hashtable map=brokerObjectName.getKeyPropertyList();
try{
- ObjectName objectName = new ObjectName(
- brokerObjectName.getDomain()+":"+
- "BrokerName="+map.get("BrokerName")+","+
- "Type=Subscription,"+
- "active=false,"+
- "name="+JMXSupport.encodeObjectNamePart(key.toString())+""
- );
-
- SubscriptionView view = new
InactiveDurableSubscriptionView(this,key.getClientId(),info);
+ ObjectName objectName=new
ObjectName(brokerObjectName.getDomain()+":"+"BrokerName="+map.get("BrokerName")
+ +","+"Type=Subscription,"+"active=false,"+"name="
+
+JMXSupport.encodeObjectNamePart(key.toString())+"");
+ SubscriptionView view=new
InactiveDurableSubscriptionView(this,key.getClientId(),info);
registeredMBeans.add(objectName);
mbeanServer.registerMBean(view,objectName);
inactiveDurableTopicSubscribers.put(objectName,view);
- subscriptionKeys.put(key, objectName);
+ subscriptionKeys.put(key,objectName);
}catch(Exception e){
log.error("Failed to register subscription "+info,e);
}
}
-
+
public CompositeData[] browse(SubscriptionView view) throws
OpenDataException{
- List messages = getSubscriberMessages(view);
+ List messages=getSubscriberMessages(view);
CompositeData c[]=new CompositeData[messages.size()];
for(int i=0;i<c.length;i++){
try{
@@ -354,7 +336,7 @@
public TabularData browseAsTable(SubscriptionView view) throws
OpenDataException{
OpenTypeFactory
factory=OpenTypeSupport.getFactory(ActiveMQMessage.class);
- List messages = getSubscriberMessages(view);
+ List messages=getSubscriberMessages(view);
CompositeType ct=factory.getCompositeType();
TabularType tt=new TabularType("MessageList","MessageList",ct,new
String[] { "JMSMessageID" });
TabularDataSupport rc=new TabularDataSupport(tt);
@@ -363,14 +345,12 @@
}
return rc;
}
-
+
protected List getSubscriberMessages(SubscriptionView view){
- final List result = new ArrayList();
- try {
- ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
- TopicMessageStore store = adaptor.createTopicMessageStore(topic);
-
-
+ final List result=new ArrayList();
+ try{
+ ActiveMQTopic topic=new ActiveMQTopic(view.getDestinationName());
+ TopicMessageStore store=adaptor.createTopicMessageStore(topic);
store.recover(new MessageRecoveryListener(){
public void recoverMessage(Message message) throws Exception{
result.add(message);
@@ -381,71 +361,75 @@
public void finished(){}
});
}catch(Throwable e){
- log.error("Failed to browse messages for Subscription " + view,e);
+ log.error("Failed to browse messages for Subscription "+view,e);
}
return result;
}
-
- protected ObjectName[] getTopics(){
- Set set = topics.keySet();
- return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+
+ protected ObjectName[] getTopics(){
+ Set set=topics.keySet();
+ return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
}
+
protected ObjectName[] getQueues(){
- Set set = queues.keySet();
- return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+ Set set=queues.keySet();
+ return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
}
+
protected ObjectName[] getTemporaryTopics(){
- Set set = temporaryTopics.keySet();
- return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+ Set set=temporaryTopics.keySet();
+ return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
}
+
protected ObjectName[] getTemporaryQueues(){
- Set set = temporaryQueues.keySet();
- return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+ Set set=temporaryQueues.keySet();
+ return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
}
-
+
protected ObjectName[] getTopicSubscribers(){
- Set set = topicSubscribers.keySet();
- return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+ Set set=topicSubscribers.keySet();
+ return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
}
+
protected ObjectName[] getDurableTopicSubscribers(){
- Set set = durableTopicSubscribers.keySet();
- return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+ Set set=durableTopicSubscribers.keySet();
+ return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
}
+
protected ObjectName[] getQueueSubscribers(){
- Set set = queueSubscribers.keySet();
- return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+ Set set=queueSubscribers.keySet();
+ return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
}
+
protected ObjectName[] getTemporaryTopicSubscribers(){
- Set set = temporaryTopicSubscribers.keySet();
- return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+ Set set=temporaryTopicSubscribers.keySet();
+ return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
}
+
protected ObjectName[] getTemporaryQueueSubscribers(){
- Set set = temporaryQueueSubscribers.keySet();
- return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+ Set set=temporaryQueueSubscribers.keySet();
+ return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
}
-
+
protected ObjectName[] getInactiveDurableTopicSubscribers(){
- Set set = inactiveDurableTopicSubscribers.keySet();
- return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+ Set set=inactiveDurableTopicSubscribers.keySet();
+ return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
}
- public Broker getContextBroker() {
+ public Broker getContextBroker(){
return contextBroker;
}
- public void setContextBroker(Broker contextBroker) {
- this.contextBroker = contextBroker;
+ public void setContextBroker(Broker contextBroker){
+ this.contextBroker=contextBroker;
}
- protected ObjectName createObjectName(ActiveMQDestination destName) throws
MalformedObjectNameException {
+ protected ObjectName createObjectName(ActiveMQDestination destName) throws
MalformedObjectNameException{
// Build the object name for the destination
Hashtable map=brokerObjectName.getKeyPropertyList();
- ObjectName objectName = new ObjectName(
- brokerObjectName.getDomain()+":"+
- "BrokerName="+map.get("BrokerName")+","+
-
"Type="+JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())+","+
-
"Destination="+JMXSupport.encodeObjectNamePart(destName.getPhysicalName())
- );
+ ObjectName objectName=new
ObjectName(brokerObjectName.getDomain()+":"+"BrokerName="+map.get("BrokerName")+","
+
+"Type="+JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())+","
+
+"Destination="+JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
return objectName;
}
}