User: user57
Date: 01/07/31 22:19:48
Modified: src/main/org/jbossmq/server Tag: jboss_buildmagic
ClientConsumer.java JBossMQService.java
JBossMQServiceMBean.java JMSDestination.java
JMSQueue.java JMSServer.java JMSTopic.java
StateManager.java StateManagerMBean.java
Log:
o updated from HEAD
Revision Changes Path
No revision
No revision
1.10.2.1 +52 -27 jbossmq/src/main/org/jbossmq/server/ClientConsumer.java
Index: ClientConsumer.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/ClientConsumer.java,v
retrieving revision 1.10
retrieving revision 1.10.2.1
diff -u -r1.10 -r1.10.2.1
--- ClientConsumer.java 2001/07/28 00:30:16 1.10
+++ ClientConsumer.java 2001/08/01 05:19:48 1.10.2.1
@@ -109,6 +109,9 @@
{
cat.debug("Queueing outbound message: "+message);
+ if( !enabled )
+ return;
+
LinkedList l = (LinkedList)destinationSubscriptions.get(
message.getJMSDestination() );
if( l == null )
throw new JMSException("No subscription found for that
destination.");
@@ -138,35 +141,29 @@
cat.debug("Adding subscription for: "+req);
req.dc = dc;
+ JMSDestination
queue=(JMSDestination)server.getJMSDestination(req.destination);
+ if (queue==null) throw new JMSException("This destination does not
exist !");
+
+ SubscriptionState subState = new SubscriptionState( req );
+ subState.dest = queue.addSubscriber(req);
+
synchronized (subscriptions ) {
- SubscriptionState subState = new SubscriptionState( req );
-
HashMap subscriptionsClone = (HashMap)subscriptions.clone();
subscriptionsClone.put(new Integer(req.subscriptionId),
subState );
subscriptions = subscriptionsClone;
LinkedList ll = (LinkedList)destinationSubscriptions.get(
req.destination );
if( ll == null ) {
-
- JMSDestination
queue=(JMSDestination)server.getJMSDestination(req.destination);
- if (queue==null) throw new JMSException("This
destination does not exist !");
-
ll = new LinkedList();
- ll.add( subState );
-
- HashMap destinationSubscriptionsClone =
(HashMap)destinationSubscriptions.clone();
- destinationSubscriptionsClone.put(req.destination, ll
);
- destinationSubscriptions =
destinationSubscriptionsClone;
- subState.dest = queue.addConsumer(req, this);
-
- } else {
- LinkedList llClone = (LinkedList)ll.clone();
- llClone.add( req );
-
- HashMap destinationSubscriptionsClone =
(HashMap)destinationSubscriptions.clone();
- destinationSubscriptionsClone.put(req.destination,
llClone);
- destinationSubscriptions =
destinationSubscriptionsClone;
+ queue.addConsumer(req,this);
}
+ else
+ ll = (LinkedList)ll.clone();
+ ll.add( subState );
+
+ HashMap destinationSubscriptionsClone =
(HashMap)destinationSubscriptions.clone();
+ destinationSubscriptionsClone.put(req.destination, ll );
+ destinationSubscriptions = destinationSubscriptionsClone;
}
}
@@ -243,15 +240,31 @@
JMSDestination queue =
server.getJMSDestination(req.subscription.destination);
if( queue == null )
throw new JMSException("The subscription's destination does
not exist");
+
+ if( enabled && req.subscription.actsLikeAQueue ) {
+ SpyMessage message = req.dest.receiveNoWait(req.subscription);
- // Is it a receiveNoWait()
- if( wait == -1 && req.subscription.actsLikeAQueue ) {
- return req.dest.receiveNoWait(req.subscription);
+ if( message != null ) {
+ AcknowledgementRequest ack = new
AcknowledgementRequest();
+ ack.destination = message.getJMSDestination();
+ ack.messageID = message.getJMSMessageID();
+ ack.subscriberId = subscriberId;
+ ack.isAck = false;
+
+ synchronized (unacknowledgedMessages) {
+ unacknowledgedMessages.put(ack, message);
+ }
+ return message;
+ }
+ }
+
+ // If not receiveNoWait()
+ if( wait != -1 ) {
+ // Notify the queue. It could be waiting for a consumer
+ req.subscription.receiving = true;
+ req.dest.notifyMessageAvailable();
}
- // Notify the queue. It could be waiting for a consumer
- req.subscription.receiving = true;
- req.dest.notifyMessageAvailable();
return null;
}
@@ -330,6 +343,9 @@
cat.debug(""+this+"->scanExclusiveQueue(queue="+queue+")");
+ if( !enabled )
+ return false;
+
Iterator i = queue.messages.iterator();
while( i.hasNext() ) {
@@ -381,6 +397,15 @@
public void setEnabled(boolean enabled) {
cat.debug(""+this+"->setEnabled(enabled="+enabled+")");
this.enabled = enabled;
+ if(enabled){
+ // queues might be waiting for messages.
+ for(Iterator it =
destinationSubscriptions.keySet().iterator();it.hasNext();){
+ SpyDestination destination = (SpyDestination)
it.next();
+ JMSDestination dest =
server.getJMSDestination(destination);
+ if(dest != null)
+ dest.notifyMessageAvailable();
+ }
+ }
}
public String toString() {
1.5.2.1 +181 -217 jbossmq/src/main/org/jbossmq/server/JBossMQService.java
Index: JBossMQService.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/JBossMQService.java,v
retrieving revision 1.5
retrieving revision 1.5.2.1
diff -u -r1.5 -r1.5.2.1
--- JBossMQService.java 2001/07/28 00:30:16 1.5
+++ JBossMQService.java 2001/08/01 05:19:48 1.5.2.1
@@ -49,7 +49,7 @@
public String getName() {
- return "JBossMQ.Server";
+ return "JBossMQ";
}
@@ -80,50 +80,21 @@
jmsServer = new JMSServer();
}
- public SpyQueue createQueue(String name) throws JMSException
+ public void createQueue(String name) throws Exception
{
- /*
- cat.debug("new queue : "+name);
-
- SpyQueue newQueue=new SpyQueue(name);
- if (destinations.containsKey(newQueue)) throw new JMSException("This
queue already exists !");
-
- JMSDestination queue=new JMSDestination(newQueue,null,this);
-
- InitialContext ctx = null;
- Context subcontext = null;
-
- try{
- //Get an InitialContext
- ctx = new InitialContext();
-
- //get the queues subcontext
- subcontext = (Context)ctx.lookup("queue");
-
- //add this queue to the jndi queue context
- subcontext.rebind(name,newQueue);
+ try {
+
+ ObjectName objectName = new ObjectName(
"JBossMQ:serivce=Queue,name="+name );
+
getServer().createMBean("org.jbossmq.server.QueueManager",objectName);
+ getServer().invoke(objectName,"init", new Object[] {}, new
String[] {});
+ getServer().invoke(objectName,"start", new Object[] {}, new
String[] {});
+
+ } catch ( Exception e ) {
+ category.warn("Could not create the destination: ", e );
+ throw e;
}
- catch(Exception e){
- //remove the queue from the destinations
- synchronized (destinations) {
- HashMap newMap=(HashMap)destinations.clone();
- newMap.remove(newQueue);
- destinations=newMap;
- }
- JMSException newE = new JMSException("Exception unbinding Queue from
the JNDI queue context");
- newE.setLinkedException(e);
- throw newE;
- }
- //Add this new JMSServerQueue to the list
- synchronized (destinations) {
- HashMap newMap=(HashMap)destinations.clone();
- newMap.put(newQueue,queue);
- destinations=newMap;
- }
- */
-
/*
//check to see if this queue already exists in the config structure
boolean queueExists = false;
@@ -173,53 +144,23 @@
*/
//return newQueue;
- return null;
}
// Administration calls
- public SpyTopic createTopic(String name) throws JMSException
+ public void createTopic(String name) throws Exception
{
- /*
- cat.debug("new topic : "+name);
-
- SpyTopic newTopic=new SpyTopic(name);
- if (destinations.containsKey(newTopic)) throw new JMSException("This
topic already exists !");
-
- JMSDestination queue=new JMSDestination(newTopic,null,this);
-
- InitialContext ctx = null;
- Context subcontext = null;
-
- try{
- //Get an InitialContext
- ctx = new InitialContext();
-
- //get the topics subcontext
- subcontext = (Context)ctx.lookup("topic");
-
- //add this topic to the jndi topic context
- subcontext.rebind(name,newTopic);
-
- }
- catch(Exception e){
- //remove the topic from the destinations
- synchronized (destinations) {
- HashMap newMap=(HashMap)destinations.clone();
- newMap.remove(newTopic);
- destinations=newMap;
- }
- JMSException newE = new JMSException("Exception binding Topic to the
JNDI topic context");
- newE.setLinkedException(e);
- throw newE;
- }
-
- //Add this new JMSServerQueue to the list
- synchronized (destinations) {
- HashMap newMap=(HashMap)destinations.clone();
- newMap.put(newTopic,queue);
- destinations=newMap;
+ try {
+
+ ObjectName objectName = new ObjectName(
"JBossMQ:serivce=Topic,name="+name );
+
getServer().createMBean("org.jbossmq.server.TopicManager",objectName);
+ getServer().invoke(objectName,"init", new Object[] {}, new
String[] {});
+ getServer().invoke(objectName,"start", new Object[] {}, new
String[] {});
+
+ } catch ( Exception e ) {
+ category.warn("Could not create the destination: ", e );
+ throw e;
}
- */
+
/*
//check to see if this topic already exists in the config structure
@@ -268,145 +209,168 @@
}
return newTopic;
*/
- return null;
}
-
- public void destroyQueue(String name) throws JMSException
- {
- /*
- cat.debug("destroy queue : "+name);
-
- SpyQueue destroyQueue=new SpyQueue(name);
- if (!destinations.containsKey(destroyQueue)) throw new
JMSException("This queue doesn't exist!");
-
- JMSDestination queue=new JMSDestination(destroyQueue,null,this);
-
- InitialContext ctx = null;
- Context subcontext = null;
- try{
- //Get an InitialContext
- ctx = new InitialContext();
-
- //get the queues subcontext
- subcontext = (Context)ctx.lookup("queue");
-
- //remove this queue to the jndi queue context
- subcontext.unbind(name);
-
- }
- catch(Exception e){
- JMSException newE = new JMSException("Exception unbinding Queue from
the JNDI queue context");
- newE.setLinkedException(e);
- throw newE;
- }
-
- //remove this new JMSServerQueue from the list
- synchronized (destinations) {
- HashMap newMap=(HashMap)destinations.clone();
- newMap.remove(destroyQueue);
- destinations=newMap;
- }
- */
-
- /*
- //check to see if this queue already exists in the config structure
- //and remove it if it does
- XElement element = null;
+ public void destroyQueue(String name) throws Exception {
+ try {
+
+ ObjectName objectName = new ObjectName(
"JBossMQ:serivce=Queue,name="+name );
+ getServer().invoke(objectName,"stop", new Object[] {}, new
String[] {});
+ getServer().invoke(objectName,"destroy", new Object[] {}, new
String[] {});
+ getServer().unregisterMBean(objectName);
+
+ } catch ( Exception e ) {
+ category.warn("Could not destory the destination: ", e );
+ throw e;
+ }
+
+ /*
+ cat.debug("destroy queue : "+name);
+
+ SpyQueue destroyQueue=new SpyQueue(name);
+ if (!destinations.containsKey(destroyQueue)) throw new JMSException("This
queue doesn't exist!");
+
+ JMSDestination queue=new JMSDestination(destroyQueue,null,this);
+
+ InitialContext ctx = null;
+ Context subcontext = null;
+
+ try{
+ //Get an InitialContext
+ ctx = new InitialContext();
+
+ //get the queues subcontext
+ subcontext = (Context)ctx.lookup("queue");
+
+ //remove this queue to the jndi queue context
+ subcontext.unbind(name);
+
+ }
+ catch(Exception e){
+ JMSException newE = new JMSException("Exception unbinding Queue from the
JNDI queue context");
+ newE.setLinkedException(e);
+ throw newE;
+ }
+
+ //remove this new JMSServerQueue from the list
+ synchronized (destinations) {
+ HashMap newMap=(HashMap)destinations.clone();
+ newMap.remove(destroyQueue);
+ destinations=newMap;
+ }
+ */
+
+ /*
+ //check to see if this queue already exists in the config structure
+ //and remove it if it does
+ XElement element = null;
+
+ try{
+ Enumeration enum = serverConfig.getElementsNamed("Queue");
+
+ while( enum.hasMoreElements() ) {
+ element = (XElement)enum.nextElement();
+ if(name.equals(element.getField("Name"))){
+ //remove this queue element from the config structure
+ element.removeFromParent();
+ break;
+ }
+
+ }
+
+ //save the new state in the config file
+ //saveConfig();
+
+
+ }
+ catch(Exception ioe){
+ ResourceAllocationException newE = new
ResourceAllocationException("Exception saving updated configuration state");
+ newE.setLinkedException(ioe);
+ throw newE;
+ }
+ */
- try{
- Enumeration enum = serverConfig.getElementsNamed("Queue");
-
- while( enum.hasMoreElements() ) {
- element = (XElement)enum.nextElement();
- if(name.equals(element.getField("Name"))){
- //remove this queue element from the config structure
- element.removeFromParent();
- break;
- }
-
- }
-
- //save the new state in the config file
- //saveConfig();
-
-
- }
- catch(Exception ioe){
- ResourceAllocationException newE = new
ResourceAllocationException("Exception saving updated configuration state");
- newE.setLinkedException(ioe);
- throw newE;
- }
- */
-
}
-
- public void destroyTopic(String name) throws JMSException
- {
- /*
- cat.debug("destroy topic : "+name);
-
- SpyTopic destroyTopic=new SpyTopic(name);
- if (!destinations.containsKey(destroyTopic)) throw new
JMSException("This topic doesn't exist!");
-
- JMSDestination queue=new JMSDestination(destroyTopic,null,this);
-
- InitialContext ctx = null;
- Context subcontext = null;
- try{
- //Get an InitialContext
- ctx = new InitialContext();
-
- //get the topics subcontext
- subcontext = (Context)ctx.lookup("topic");
-
- //remove this topic to the jndi topic context
- subcontext.unbind(name);
+ public void destroyTopic(String name) throws Exception {
+ try {
+
+ ObjectName objectName = new ObjectName(
"JBossMQ:serivce=Topic,name="+name );
+ getServer().invoke(objectName,"stop", new Object[] {}, new
String[] {});
+ getServer().invoke(objectName,"destroy", new Object[] {}, new
String[] {});
+ getServer().unregisterMBean(objectName);
+
+ } catch ( Exception e ) {
+ category.warn("Could not destory the destination: ", e );
+ throw e;
+ }
+
+ /*
+ cat.debug("destroy queue : "+name);
+
+ SpyQueue destroyQueue=new SpyQueue(name);
+ if (!destinations.containsKey(destroyQueue)) throw new JMSException("This
queue doesn't exist!");
+
+ JMSDestination queue=new JMSDestination(destroyQueue,null,this);
+
+ InitialContext ctx = null;
+ Context subcontext = null;
+
+ try{
+ //Get an InitialContext
+ ctx = new InitialContext();
+
+ //get the queues subcontext
+ subcontext = (Context)ctx.lookup("queue");
+
+ //remove this queue to the jndi queue context
+ subcontext.unbind(name);
+
+ }
+ catch(Exception e){
+ JMSException newE = new JMSException("Exception unbinding Queue from the
JNDI queue context");
+ newE.setLinkedException(e);
+ throw newE;
+ }
+
+ //remove this new JMSServerQueue from the list
+ synchronized (destinations) {
+ HashMap newMap=(HashMap)destinations.clone();
+ newMap.remove(destroyQueue);
+ destinations=newMap;
+ }
+ */
+
+ /*
+ //check to see if this queue already exists in the config structure
+ //and remove it if it does
+ XElement element = null;
+
+ try{
+ Enumeration enum = serverConfig.getElementsNamed("Queue");
+
+ while( enum.hasMoreElements() ) {
+ element = (XElement)enum.nextElement();
+ if(name.equals(element.getField("Name"))){
+ //remove this queue element from the config structure
+ element.removeFromParent();
+ break;
+ }
+
+ }
+
+ //save the new state in the config file
+ //saveConfig();
+
+
+ }
+ catch(Exception ioe){
+ ResourceAllocationException newE = new
ResourceAllocationException("Exception saving updated configuration state");
+ newE.setLinkedException(ioe);
+ throw newE;
+ }
+ */
- }
- catch(Exception e){
- JMSException newE = new JMSException("Exception unbinding Topic from
the JNDI topic context");
- newE.setLinkedException(e);
- throw newE;
- }
-
- //remove this new JMSServerQueue from the list
- synchronized (destinations) {
- HashMap newMap=(HashMap)destinations.clone();
- newMap.remove(destroyTopic);
- destinations=newMap;
- }
- */
-
- /*
- //check to see if this topic already exists in the config structure
- //and remove it if it does
- XElement element = null;
-
- try{
- Enumeration enum = serverConfig.getElementsNamed("Topic");
-
- while( enum.hasMoreElements() ) {
- element = (XElement)enum.nextElement();
- if(name.equals(element.getField("Name"))){
- //remove this topic element from the config structure
- element.removeFromParent();
- break;
- }
-
- }
-
- //save the new state in the config file
- //saveConfig();
- }
- catch(Exception ioe){
- ResourceAllocationException newE = new
ResourceAllocationException("Exception saving updated configuration state");
- newE.setLinkedException(ioe);
- throw newE;
- }
- */
-
}
}
1.6.2.1 +53 -1 jbossmq/src/main/org/jbossmq/server/JBossMQServiceMBean.java
Index: JBossMQServiceMBean.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/JBossMQServiceMBean.java,v
retrieving revision 1.6
retrieving revision 1.6.2.1
diff -u -r1.6 -r1.6.2.1
--- JBossMQServiceMBean.java 2001/07/28 00:30:16 1.6
+++ JBossMQServiceMBean.java 2001/08/01 05:19:48 1.6.2.1
@@ -75,4 +75,56 @@
// Public --------------------------------------------------------
JMSServer getJMSServer();
+
+ // Public --------------------------------------------------------
+
+
+ // Public --------------------------------------------------------
+
+
+ // Public --------------------------------------------------------
+
+
+ // Public --------------------------------------------------------
+
+ public void createQueue(String name) throws Exception;
+
+ // Public --------------------------------------------------------
+
+
+ // Public --------------------------------------------------------
+
+
+ // Public --------------------------------------------------------
+
+
+ // Public --------------------------------------------------------
+
+ public void createTopic(String name) throws Exception;
+
+ // Public --------------------------------------------------------
+
+
+ // Public --------------------------------------------------------
+
+
+ // Public --------------------------------------------------------
+
+
+ // Public --------------------------------------------------------
+
+ public void destroyQueue(String name) throws Exception;
+
+ // Public --------------------------------------------------------
+
+
+ // Public --------------------------------------------------------
+
+
+ // Public --------------------------------------------------------
+
+
+ // Public --------------------------------------------------------
+
+ public void destroyTopic(String name) throws Exception;
}
1.8.2.1 +4 -2 jbossmq/src/main/org/jbossmq/server/JMSDestination.java
Index: JMSDestination.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/JMSDestination.java,v
retrieving revision 1.8
retrieving revision 1.8.2.1
diff -u -r1.8 -r1.8.2.1
--- JMSDestination.java 2001/07/28 00:30:16 1.8
+++ JMSDestination.java 2001/08/01 05:19:48 1.8.2.1
@@ -86,7 +86,9 @@
org.apache.log4j.Category cat;
// Package protected ---------------------------------------------
- abstract JMSDestination addConsumer(Subscription sub, ClientConsumer c) throws
JMSException;
+ abstract void addConsumer(Subscription sub,ClientConsumer c) throws
JMSException;
+
+ abstract JMSDestination addSubscriber(Subscription sub) throws JMSException;
abstract public void notifyMessageAvailable();
1.1.2.1 +7 -3 jbossmq/src/main/org/jbossmq/server/JMSQueue.java
Index: JMSQueue.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/JMSQueue.java,v
retrieving revision 1.1
retrieving revision 1.1.2.1
diff -u -r1.1 -r1.1.2.1
--- JMSQueue.java 2001/07/28 00:33:38 1.1
+++ JMSQueue.java 2001/08/01 05:19:48 1.1.2.1
@@ -111,9 +111,13 @@
ExclusiveQueue exclusiveQueue;
// Package protected ---------------------------------------------
- JMSDestination addConsumer(Subscription sub,ClientConsumer c) throws
JMSException {
- cat.debug("Adding consumer: "+c+")");
+ void addConsumer(Subscription sub,ClientConsumer c) throws JMSException {
+ cat.debug("Adding consumer: "+c);
exclusiveQueue.addConsumer(c);
+ }
+
+ JMSDestination addSubscriber(Subscription sub) throws JMSException {
+ cat.debug("Adding subscriber: "+sub);
return this;
}
1.15.2.1 +17 -10 jbossmq/src/main/org/jbossmq/server/JMSServer.java
Index: JMSServer.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/JMSServer.java,v
retrieving revision 1.15
retrieving revision 1.15.2.1
diff -u -r1.15 -r1.15.2.1
--- JMSServer.java 2001/07/28 00:30:16 1.15
+++ JMSServer.java 2001/08/01 05:19:48 1.15.2.1
@@ -46,8 +46,6 @@
*/
public class JMSServer
{
-
-
/////////////////////////////////////////////////////////////////////
// Attributes
/////////////////////////////////////////////////////////////////////
@@ -217,13 +215,13 @@
//A connection has sent a new message
public void addMessage(ConnectionToken dc, SpyMessage val, Long txId) throws
JMSException
{
-
cat.debug("INCOMING: (TX="+txId+")"+dc.getClientID()+" =>
"+val.getJMSDestination());
JMSDestination
queue=(JMSDestination)destinations.get(val.getJMSDestination());
if (queue==null) throw new JMSException("This destination does not
exist !");
+
//Add the message to the queue
+ val.setReadOnlyMode();
queue.addMessage(val, txId);
-
}
/**
@@ -376,11 +374,14 @@
//A connection object wants to subscribe to a Destination
public void subscribe(ConnectionToken dc, Subscription sub) throws JMSException
{
- cat.debug("Server:
subscribe(dest="+sub.destination+",idConnection="+dc.getClientID()+")");
-
- ClientConsumer ClientConsumer = getClientConsumer(dc);
-
- ClientConsumer.addSubscription(sub);
+ try {
+ cat.debug("Server:
subscribe(dest="+sub.destination+",idConnection="+dc.getClientID()+")");
+ ClientConsumer ClientConsumer = getClientConsumer(dc);
+ ClientConsumer.addSubscription(sub);
+ } catch ( JMSException e ) {
+ cat.debug("Exception:", e);
+ throw e;
+ }
}
@@ -515,11 +516,17 @@
stateManager = newStateManager;
}
+ public static final String JBOSS_VESION = "JBossMQ ver. 0.9b";
+
public void restoreMessage(SpyMessage message)
{
JMSDestination
queue=(JMSDestination)destinations.get(message.getJMSDestination());
if (queue==null) throw new RuntimeException("This destination does not
exist!");
//Add the message to the queue
queue.restoreMessage(message);
+ }
+
+ public String toString() {
+ return JBOSS_VESION;
}
}
1.1.2.1 +15 -5 jbossmq/src/main/org/jbossmq/server/JMSTopic.java
Index: JMSTopic.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/JMSTopic.java,v
retrieving revision 1.1
retrieving revision 1.1.2.1
diff -u -r1.1 -r1.1.2.1
--- JMSTopic.java 2001/07/28 00:33:38 1.1
+++ JMSTopic.java 2001/08/01 05:19:48 1.1.2.1
@@ -112,8 +112,20 @@
// Package protected ---------------------------------------------
- JMSDestination addConsumer(Subscription sub,ClientConsumer c) throws
JMSException {
+ void addConsumer(Subscription sub,ClientConsumer c) throws JMSException {
+ cat.debug("Adding consumer: "+c);
+ SpyTopic topic = (SpyTopic)sub.destination;
+ DurableSubcriptionID id = topic.getDurableSubscriptionID();
+ if( id!=null ) {
+ JMSQueue queue = getDurableSubscription(id);
+ queue.addConsumer(sub,c);
+ }
+ else {
+ sharedQueue.addConsumer(c);
+ }
+ }
+ JMSDestination addSubscriber(Subscription sub) throws JMSException {
SpyTopic topic = (SpyTopic)sub.destination;
DurableSubcriptionID id = topic.getDurableSubscriptionID();
if( id!=null ) {
@@ -129,11 +141,9 @@
}
}
- return queue.addConsumer(sub, c);
+ return queue.addSubscriber(sub);
} else {
- cat.debug("Adding consumer: "+c);
- sharedQueue.addConsumer(c);
return this;
}
}
1.4.2.1 +93 -84 jbossmq/src/main/org/jbossmq/server/StateManager.java
Index: StateManager.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/StateManager.java,v
retrieving revision 1.4
retrieving revision 1.4.2.1
diff -u -r1.4 -r1.4.2.1
--- StateManager.java 2001/07/28 00:30:16 1.4
+++ StateManager.java 2001/08/01 05:19:48 1.4.2.1
@@ -200,9 +200,14 @@
category.debug("Restarting Durable Subscription:
"+clientId+","+name+","+topicName);
SpyTopic topic=new SpyTopic(topicName);
-
JMSTopic dest =
(JMSTopic)server.getJMSDestination(topic);
- dest.createDurableSubscription(new
DurableSubcriptionID(clientId, name));
+ if( dest == null ) {
+ category.warn("Subscription topic of not
found: "+topicName);
+ category.warn("Subscription cannot be
initialized: "+clientId+","+name);
+ element.removeFromParent();
+ } else {
+ dest.createDurableSubscription(new
DurableSubcriptionID(clientId, name));
+ }
} catch (JMSException e ) {
category.error("Could not initialize a durable
subscription for : Client Id="+clientId+", Name="+name+", Topic Name="+topicName,e);
@@ -238,88 +243,92 @@
stateFile = newStateFile;
}
- public void setDurableSubscription(JMSServer server, DurableSubcriptionID sub,
SpyTopic topic) throws JMSException {
- category.debug("Checking durable subscription: "+sub+", on topic:
"+topic);
- try {
- //Set the known Ids
- Enumeration enum = stateConfig.getElementsNamed("User");
- while( enum.hasMoreElements() ) {
-
- // Match the User.Name
- XElement user = (XElement)enum.nextElement();
- if( !user.containsField("Id") ||
!user.getField("Id").equals(sub.getClientID()) )
- continue;
-
- category.debug("Found a matching ClientID
configuration section.");
-
-
- XElement subscription=null;
-
- // Match the User/DurableSubscription.Name
- Enumeration enum2 =
user.getElementsNamed("DurableSubscription");
- while( enum2.hasMoreElements() ) {
- XElement t = (XElement)enum2.nextElement();
- if(
t.getField("Name").equals(sub.getSubscriptionName())) {
- subscription = t;
- break;
- }
- }
+ public String displayStateConfig() throws Exception {
+ return stateConfig.toString();
+ }
- if( subscription == null ) {
- category.debug("The subscription was not
previously registered.");
- // it was not previously registered...
- if( topic == null )
- return;
-
- subscription = new
XElement("DurableSubscription");
- subscription.addField("Name",
sub.getSubscriptionName());
- subscription.addField("TopicName",
topic.getName());
- user.addElement(subscription);
-
- JMSTopic dest =
(JMSTopic)server.getJMSDestination(topic);
- dest.createDurableSubscription(sub);
-
- saveConfig();
-
- } else {
- category.debug("The subscription was
previously registered.");
- // it was previously registered...
- if(
subscription.getField("TopicName").equals(topic.getName()) ) {
- // and it is the same as before, do
nothing.
- return;
- } else {
- category.debug("But the topic was
different, changing the subscription.");
- // we have to change the
subscription...
- SpyTopic prevTopic = new SpyTopic(
subscription.getField("TopicName") );
- JMSTopic dest =
(JMSTopic)server.getJMSDestination(prevTopic);
- dest.destoryDurableSubscription(sub);
-
- if( topic == null ) {
-
subscription.removeFromParent();
- } else {
-
subscription.setField("TopicName", topic.getName());
- dest =
(JMSTopic)server.getJMSDestination(topic);
-
dest.createDurableSubscription(sub);
- }
-
- saveConfig();
- }
- // Subscription existed
- return;
- }
+ public void setDurableSubscription(JMSServer server, DurableSubcriptionID sub,
SpyTopic topic) throws JMSException {
+ category.debug("Checking durable subscription: " + sub + ", on topic: " +
topic);
+ try {
+ //Set the known Ids
+ Enumeration enum= stateConfig.getElementsNamed("User");
+ while (enum.hasMoreElements()) {
+
+ // Match the User.Name
+ XElement user= (XElement) enum.nextElement();
+ if (!user.containsField("Id") ||
!user.getField("Id").equals(sub.getClientID()))
+ continue;
+
+ category.debug("Found a matching ClientID configuration section.");
+
+ XElement subscription= null;
+
+ // Match the User/DurableSubscription.Name
+ Enumeration enum2= user.getElementsNamed("DurableSubscription");
+ while (enum2.hasMoreElements()) {
+ XElement t= (XElement) enum2.nextElement();
+ if (t.getField("Name").equals(sub.getSubscriptionName())) {
+ subscription= t;
+ break;
+ }
+ }
+
+ if (subscription == null) {
+ category.debug("The subscription was not previously
registered.");
+ // it was not previously registered...
+ if (topic == null)
+ return;
+
+ subscription= new XElement("DurableSubscription");
+ subscription.addField("Name", sub.getSubscriptionName());
+ subscription.addField("TopicName", topic.getName());
+ user.addElement(subscription);
+
+ JMSTopic dest= (JMSTopic) server.getJMSDestination(topic);
+ dest.createDurableSubscription(sub);
+
+ saveConfig();
+
+ } else {
+ category.debug("The subscription was previously registered.");
+ // it was previously registered...
+ if
(subscription.getField("TopicName").equals(topic.getName())) {
+ // and it is the same as before, do nothing.
+ return;
+ } else {
+ category.debug("But the topic was different, changing the
subscription.");
+ // we have to change the subscription...
+ SpyTopic prevTopic= new
SpyTopic(subscription.getField("TopicName"));
+ JMSTopic dest= (JMSTopic)
server.getJMSDestination(prevTopic);
+ dest.destoryDurableSubscription(sub);
+
+ if (topic == null) {
+ subscription.removeFromParent();
+ } else {
+ subscription.setField("TopicName", topic.getName());
+ dest= (JMSTopic) server.getJMSDestination(topic);
+ dest.createDurableSubscription(sub);
+ }
+
+ saveConfig();
+ }
+ // Subscription existed
+ }
+ return;
+ }
+
+ // Could not find that user..
+ throw new JMSException("ClientID '" + sub.getClientID() + "'
cannot create durable subscriptions.");
+
+ } catch (java.io.IOException e) {
+ JMSException newE= new JMSException("Could not setup the durable
subscription");
+ newE.setLinkedException(e);
+ throw newE;
+ } catch (org.jbossmq.xml.XElementException e) {
+ JMSException newE= new JMSException("Could not setup the durable
subscription");
+ newE.setLinkedException(e);
+ throw newE;
+ }
- // Could not find that user..
- throw new JMSException("ClientID
'"+sub.getClientID()+"' cannot create durable subscriptions.");
- }
- } catch ( java.io.IOException e ) {
- JMSException newE = new JMSException("Could not setup the
durable subscription");
- newE.setLinkedException(e);
- throw newE;
- } catch ( org.jbossmq.xml.XElementException e ) {
- JMSException newE = new JMSException("Could not setup the
durable subscription");
- newE.setLinkedException(e);
- throw newE;
- }
-
}
}
1.3.2.1 +3 -3 jbossmq/src/main/org/jbossmq/server/StateManagerMBean.java
Index: StateManagerMBean.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/StateManagerMBean.java,v
retrieving revision 1.3
retrieving revision 1.3.2.1
diff -u -r1.3 -r1.3.2.1
--- StateManagerMBean.java 2001/07/28 00:30:16 1.3
+++ StateManagerMBean.java 2001/08/01 05:19:48 1.3.2.1
@@ -60,8 +60,6 @@
public interface StateManagerMBean
extends org.jboss.util.ServiceMBean
{
-
-
// Public --------------------------------------------------------
@@ -75,4 +73,6 @@
public java.lang.String getStateFile();
public void setStateFile(java.lang.String newStateFile);
+
+ public String displayStateConfig() throws Exception;
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development