User: chirino
Date: 01/07/27 17:30:15
Modified: src/main/org/jbossmq Connection.java Mutex.java
SpyConnection.java SpyConnectionConsumer.java
SpyJMSException.java SpyMessage.java
SpyMessageConsumer.java SpyQueue.java
SpyQueueReceiver.java SpyTopic.java
SpyTopicSession.java SpyTopicSubscriber.java
Subscription.java
Log:
Once again many changes.
- The logic that handled the processing of queue and topic messages
was seperated our more to make it easier to follow.
- A QueuedTask class was created to avoid unneeded processing of queues.
- The interface between the client-server-queues-peristence manager to handel
DurableSubscription was too verbose, created a DurableSubscripton class and now
SpyTopics can be inspected to see if they are being used as a DurableSubscription
- The MBeans that add queues and topics makes it simpler to configure a queue/topic.
Revision Changes Path
1.3 +2 -2 jbossmq/src/main/org/jbossmq/Connection.java
Index: Connection.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/Connection.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- Connection.java 2001/07/16 02:51:44 1.2
+++ Connection.java 2001/07/28 00:30:15 1.3
@@ -43,7 +43,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class Connection implements java.io.Serializable, javax.jms.Connection {
//////////////////////////////////////////////////////////////
@@ -350,7 +350,7 @@
serverIL.subscribe(connectionToken, req);
} catch (Exception e) {
- throw new SpyJMSException("Cannot subscribe to this
Destination",e);
+ throw new SpyJMSException("Cannot subscribe to this
Destination: "+e.getMessage(),e);
}
}
1.5 +7 -1 jbossmq/src/main/org/jbossmq/Mutex.java
Index: Mutex.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/Mutex.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- Mutex.java 2001/07/16 02:51:44 1.4
+++ Mutex.java 2001/07/28 00:30:15 1.5
@@ -42,10 +42,16 @@
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
+/*
+ * JBossMQ, the OpenSource JMS implementation
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
/**
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class Mutex
{
1.13 +9 -12 jbossmq/src/main/org/jbossmq/SpyConnection.java
Index: SpyConnection.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyConnection.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- SpyConnection.java 2001/07/16 02:51:44 1.12
+++ SpyConnection.java 2001/07/28 00:30:15 1.13
@@ -32,7 +32,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.12 $
+ * @version $Revision: 1.13 $
*/
public class SpyConnection
extends Connection
@@ -64,7 +64,7 @@
{
if (closed) throw new IllegalStateException("The connection is
closed");
- return new SpyConnectionConsumer(this, topic, null, messageSelector,
sessionPool, maxMessages);
+ return new SpyConnectionConsumer(this, topic, false, messageSelector,
sessionPool, maxMessages);
}
@@ -90,15 +90,12 @@
}
}
- public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
- String subscriptionName,
- String messageSelector,
- ServerSessionPool sessionPool,
- int maxMessages) throws
JMSException
- {
- if (closed) throw new IllegalStateException("The connection is
closed");
-
- return new SpyConnectionConsumer(this, topic, subscriptionName,
messageSelector, sessionPool, maxMessages);
+ public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String
subscriptionName, String messageSelector, ServerSessionPool sessionPool, int
maxMessages) throws JMSException {
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
+
+ SpyTopic t = new SpyTopic((SpyTopic)topic, getClientID(),
subscriptionName);
+ return new SpyConnectionConsumer(this, t, true, messageSelector,
sessionPool, maxMessages);
}
// Constants -----------------------------------------------------
@@ -129,7 +126,7 @@
int maxMessages) throws
JMSException
{
if (closed) throw new IllegalStateException("The connection is
closed");
- return new SpyConnectionConsumer(this, queue, null, messageSelector,
sessionPool, maxMessages);
+ return new SpyConnectionConsumer(this, queue, true, messageSelector,
sessionPool, maxMessages);
}
//Get a queue
1.5 +5 -3 jbossmq/src/main/org/jbossmq/SpyConnectionConsumer.java
Index: SpyConnectionConsumer.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyConnectionConsumer.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- SpyConnectionConsumer.java 2001/07/16 02:51:44 1.4
+++ SpyConnectionConsumer.java 2001/07/28 00:30:15 1.5
@@ -19,7 +19,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class SpyConnectionConsumer implements javax.jms.ConnectionConsumer,
SpyConsumer {
@@ -105,10 +105,12 @@
static org.apache.log4j.Category cat =
org.apache.log4j.Category.getInstance(SpyConnectionConsumer.class);
+
+
/**
* SpyConnectionConsumer constructor comment.
*/
- public SpyConnectionConsumer(Connection connection, Destination destination,
String subscriptionName, String messageSelector, ServerSessionPool serverSessionPool,
int maxMessages)
+ public SpyConnectionConsumer(Connection connection, Destination destination,
boolean actsLikeAQueue, String messageSelector, ServerSessionPool serverSessionPool,
int maxMessages)
throws JMSException {
this.connection = connection;
@@ -118,10 +120,10 @@
subscription.destination = (SpyDestination)destination;
subscription.messageSelector = messageSelector;
- subscription.durableSubscriptionName = subscriptionName;
subscription.noLocal = false;
subscription.listening = true;
subscription.receiving = false;
+ subscription.actsLikeAQueue = actsLikeAQueue;
connection.addConsumer(this);
connection.listenerChange(subscription.subscriptionId,true);
1.2 +1 -1 jbossmq/src/main/org/jbossmq/SpyJMSException.java
Index: SpyJMSException.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyJMSException.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SpyJMSException.java 2001/07/11 02:23:24 1.1
+++ SpyJMSException.java 2001/07/28 00:30:15 1.2
@@ -38,4 +38,4 @@
super(arg1, arg2);
setLinkedException(e);
}
-}
+}
\ No newline at end of file
1.8 +2 -2 jbossmq/src/main/org/jbossmq/SpyMessage.java
Index: SpyMessage.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyMessage.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- SpyMessage.java 2001/07/16 02:51:44 1.7
+++ SpyMessage.java 2001/07/28 00:30:15 1.8
@@ -23,7 +23,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.7 $
+ * @version $Revision: 1.8 $
*/
public class SpyMessage
implements Serializable, Cloneable, Message, Comparable
@@ -451,7 +451,7 @@
return 1;
}
return (int)(messageId - sm.messageId);
- }
+ }
public void doAcknowledge() throws JMSException
1.8 +5 -6 jbossmq/src/main/org/jbossmq/SpyMessageConsumer.java
Index: SpyMessageConsumer.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyMessageConsumer.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- SpyMessageConsumer.java 2001/07/16 02:51:44 1.7
+++ SpyMessageConsumer.java 2001/07/28 00:30:15 1.8
@@ -26,7 +26,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.7 $
+ * @version $Revision: 1.8 $
*/
public class SpyMessageConsumer implements MessageConsumer, SpyConsumer {
@@ -87,7 +87,7 @@
subscription.receiving = true;
- if ( this instanceof SpyQueueReceiver ||
subscription.durableSubscriptionName!=null )
+ if ( subscription.actsLikeAQueue )
session.connection.receive(subscription, 0);
synchronized (messages) {
@@ -130,7 +130,7 @@
subscription.receiving = true;
- if ( this instanceof SpyQueueReceiver ||
subscription.durableSubscriptionName!=null )
+ if ( subscription.actsLikeAQueue )
session.connection.receive(subscription, timeOut);
synchronized (messages) {
@@ -179,7 +179,7 @@
subscription.receiving = true;
try {
- if ( this instanceof SpyQueueReceiver ||
subscription.durableSubscriptionName!=null ) {
+ if ( subscription.actsLikeAQueue ) {
if (session.modeStop)
return null;
@@ -251,8 +251,7 @@
return false;
// Should we NACK the messages??
- if( messageListener==null && !subscription.receiving &&
- (this instanceof SpyQueueReceiver ||
subscription.durableSubscriptionName!=null) ) {
+ if( messageListener==null && !subscription.receiving &&
subscription.actsLikeAQueue ) {
Message mes = getMessage();
while (mes != null ) {
1.3 +3 -6 jbossmq/src/main/org/jbossmq/SpyQueue.java
Index: SpyQueue.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyQueue.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SpyQueue.java 2001/03/02 01:12:41 1.2
+++ SpyQueue.java 2001/07/28 00:30:15 1.3
@@ -20,7 +20,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class SpyQueue extends SpyDestination implements java.io.Serializable,
javax.jms.Queue, javax.naming.Referenceable {
// Constructor ---------------------------------------------------
@@ -31,16 +31,13 @@
hash++;
}
- // Public --------------------------------------------------------
-
- public String getQueueName() throws JMSException
- {
+ public String getQueueName() {
return name;
}
public String toString()
{
- return "Queue@"+name;
+ return "QUEUE."+name;
}
// Object override -----------------------------------------------
1.3 +2 -2 jbossmq/src/main/org/jbossmq/SpyQueueReceiver.java
Index: SpyQueueReceiver.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyQueueReceiver.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SpyQueueReceiver.java 2001/03/02 01:12:41 1.2
+++ SpyQueueReceiver.java 2001/07/28 00:30:15 1.3
@@ -18,7 +18,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class SpyQueueReceiver extends SpyMessageConsumer implements QueueReceiver {
// Attributes ----------------------------------------------------
@@ -44,8 +44,8 @@
subscription.destination = (SpyDestination)queue;
subscription.messageSelector = selector;
- subscription.durableSubscriptionName = null;
subscription.noLocal = false;
+ subscription.actsLikeAQueue = true;
}
}
1.3 +26 -4 jbossmq/src/main/org/jbossmq/SpyTopic.java
Index: SpyTopic.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyTopic.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SpyTopic.java 2001/03/02 01:12:42 1.2
+++ SpyTopic.java 2001/07/28 00:30:15 1.3
@@ -20,7 +20,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class SpyTopic extends SpyDestination implements java.io.Serializable,
javax.jms.Topic, javax.naming.Referenceable {
// Constructor ---------------------------------------------------
@@ -32,14 +32,15 @@
// Public --------------------------------------------------------
- public String getTopicName() throws JMSException
- {
+ public String getTopicName() {
return name;
}
public String toString()
{
- return "Topic@"+name;
+ if( durableSubscriptionID != null)
+ return "TOPIC."+name+"."+durableSubscriptionID;
+ return "TOPIC."+name;
}
// Object override -----------------------------------------------
@@ -59,5 +60,26 @@
"org.jbossmq.SpyTopic",
new StringRefAddr("name", name),
"org.jbossmq.referenceable.SpyDestinationObjectFactory", null);
+ }
+
+ DurableSubcriptionID durableSubscriptionID;
+
+ // Constructor ---------------------------------------------------
+
+ public SpyTopic(SpyTopic topic, String clientID, String subscriptionName)
+ {
+ this(topic, new DurableSubcriptionID(clientID, subscriptionName));
+ }
+
+ // Constructor ---------------------------------------------------
+
+ public SpyTopic(SpyTopic topic, DurableSubcriptionID subid)
+ {
+ super(topic.getTopicName());
+ this.durableSubscriptionID = subid;
+ }
+
+ public DurableSubcriptionID getDurableSubscriptionID() {
+ return durableSubscriptionID;
}
}
1.5 +7 -6 jbossmq/src/main/org/jbossmq/SpyTopicSession.java
Index: SpyTopicSession.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyTopicSession.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- SpyTopicSession.java 2001/07/16 02:51:44 1.4
+++ SpyTopicSession.java 2001/07/28 00:30:15 1.5
@@ -36,7 +36,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class SpyTopicSession
extends SpySession
@@ -59,7 +59,6 @@
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
-
return createSubscriber(topic,null,false);
}
@@ -67,7 +66,7 @@
{
if (closed) throw new IllegalStateException("The session is closed");
- SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,noLocal,
messageSelector, null);
+ SpyTopicSubscriber sub=new
SpyTopicSubscriber(this,(SpyTopic)topic,noLocal, messageSelector);
addConsumer(sub);
return sub;
@@ -76,8 +75,9 @@
public TopicSubscriber createDurableSubscriber(Topic topic, String name)
throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
-
- SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,false, null,
name);
+
+ SpyTopic t = new SpyTopic((SpyTopic)topic, connection.getClientID(),
name);
+ SpyTopicSubscriber sub=new SpyTopicSubscriber(this,t,false, null);
addConsumer(sub);
return sub;
@@ -88,7 +88,8 @@
{
if (closed) throw new IllegalStateException("The session is closed");
- SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,noLocal,
messageSelector, name);
+ SpyTopic t = new SpyTopic((SpyTopic)topic, connection.getClientID(),
name);
+ SpyTopicSubscriber sub=new SpyTopicSubscriber(this,t,noLocal,
messageSelector);
addConsumer(sub);
return sub;
1.3 +6 -4 jbossmq/src/main/org/jbossmq/SpyTopicSubscriber.java
Index: SpyTopicSubscriber.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyTopicSubscriber.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SpyTopicSubscriber.java 2001/03/02 01:12:42 1.2
+++ SpyTopicSubscriber.java 2001/07/28 00:30:15 1.3
@@ -22,7 +22,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class SpyTopicSubscriber
extends SpyMessageConsumer
@@ -31,7 +31,7 @@
// Attributes ----------------------------------------------------
//The topic I registered
- private Topic topic;
+ private SpyTopic topic;
// Public --------------------------------------------------------
@@ -47,16 +47,18 @@
return subscription.noLocal;
}
+
+
// Constructor ---------------------------------------------------
- SpyTopicSubscriber(SpyTopicSession session,Topic topic,boolean noLocal, String
selector, String durableSubscriptionName)
+ SpyTopicSubscriber(SpyTopicSession session,SpyTopic topic,boolean noLocal,
String selector)
{
super(session, false);
this.topic=topic;
subscription.destination = (SpyDestination)topic;
subscription.messageSelector = selector;
- subscription.durableSubscriptionName = durableSubscriptionName;
subscription.noLocal = noLocal;
+ subscription.actsLikeAQueue = topic.getDurableSubscriptionID() != null;
}
}
1.5 +20 -10 jbossmq/src/main/org/jbossmq/Subscription.java
Index: Subscription.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/Subscription.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- Subscription.java 2001/07/16 02:51:44 1.4
+++ Subscription.java 2001/07/28 00:30:15 1.5
@@ -18,7 +18,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class Subscription
implements Serializable
@@ -32,25 +32,22 @@
// Should this message destroy the subscription?
public boolean destroyDurableSubscription;
- // this is not null if we want a durable subscription
- public String durableSubscriptionName;
+
// Topics might not want locally produced messages
public boolean noLocal;
// Transient Values
- private transient Selector selector;
+ public transient Selector selector;
public transient ConnectionToken dc;
public transient boolean listening;
public transient boolean receiving;
// Determines the consumer would accept the message.
public boolean accepts( SpyMessage message, boolean exclusive ) throws
javax.jms.JMSException {
-
- if( messageSelector != null ) {
- if( selector==null )
- selector = new Selector(messageSelector);
- if( !selector.test(message) )
+ Selector ms = getSelector();
+ if( ms != null ) {
+ if( !ms.test(message) )
return false;
}
@@ -61,7 +58,7 @@
return false;
// But if the subscriber is durable, then it acts like a Queue
- if( durableSubscriptionName != null ) {
+ if( actsLikeAQueue ) {
if( !exclusive )
return false;
@@ -84,4 +81,17 @@
}
+ public boolean actsLikeAQueue;
+
+ // Determines the consumer would accept the message.
+ public Selector getSelector() throws javax.jms.JMSException {
+
+ if( messageSelector == null )
+ return null;
+
+ if( selector==null )
+ selector = new Selector(messageSelector);
+
+ return selector;
+ }
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development