User: chirino 
  Date: 01/07/27 17:30:15

  Modified:    src/main/org/jbossmq Connection.java Mutex.java
                        SpyConnection.java SpyConnectionConsumer.java
                        SpyJMSException.java SpyMessage.java
                        SpyMessageConsumer.java SpyQueue.java
                        SpyQueueReceiver.java SpyTopic.java
                        SpyTopicSession.java SpyTopicSubscriber.java
                        Subscription.java
  Log:
  Once again many changes.
  - The logic that handled the processing of queue and topic messages
   was seperated our more to make it easier to follow.
  - A QueuedTask class was created to avoid unneeded processing of queues.
  - The interface between the client-server-queues-peristence manager to handel
   DurableSubscription was too verbose, created a DurableSubscripton class and now
   SpyTopics can be inspected to see if they are being used as a DurableSubscription
  - The MBeans that add queues and topics makes it simpler to configure a queue/topic.
  
  Revision  Changes    Path
  1.3       +2 -2      jbossmq/src/main/org/jbossmq/Connection.java
  
  Index: Connection.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/Connection.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- Connection.java   2001/07/16 02:51:44     1.2
  +++ Connection.java   2001/07/28 00:30:15     1.3
  @@ -43,7 +43,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class Connection implements java.io.Serializable, javax.jms.Connection {
        //////////////////////////////////////////////////////////////
  @@ -350,7 +350,7 @@
                        serverIL.subscribe(connectionToken, req);
                        
                } catch (Exception e) {
  -                     throw new SpyJMSException("Cannot subscribe to this 
Destination",e);
  +                     throw new SpyJMSException("Cannot subscribe to this 
Destination: "+e.getMessage(),e);
                }
   
        }
  
  
  
  1.5       +7 -1      jbossmq/src/main/org/jbossmq/Mutex.java
  
  Index: Mutex.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/Mutex.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- Mutex.java        2001/07/16 02:51:44     1.4
  +++ Mutex.java        2001/07/28 00:30:15     1.5
  @@ -42,10 +42,16 @@
    * Distributable under LGPL license.
    * See terms of license at gnu.org.
    */
  +/*
  + * JBossMQ, the OpenSource JMS implementation
  + *
  + * Distributable under LGPL license.
  + * See terms of license at gnu.org.
  + */
   /**
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.4 $
  + *   @version $Revision: 1.5 $
    */
   public class Mutex
   {
  
  
  
  1.13      +9 -12     jbossmq/src/main/org/jbossmq/SpyConnection.java
  
  Index: SpyConnection.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyConnection.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- SpyConnection.java        2001/07/16 02:51:44     1.12
  +++ SpyConnection.java        2001/07/28 00:30:15     1.13
  @@ -32,7 +32,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.12 $
  + *   @version $Revision: 1.13 $
    */
   public class SpyConnection 
        extends Connection 
  @@ -64,7 +64,7 @@
        {
                if (closed) throw new IllegalStateException("The connection is 
closed");                
                                                                                
  -             return new SpyConnectionConsumer(this, topic, null, messageSelector, 
sessionPool, maxMessages);
  +             return new SpyConnectionConsumer(this, topic, false, messageSelector, 
sessionPool, maxMessages);
        }
   
   
  @@ -90,15 +90,12 @@
                }
        }
        
  -     public ConnectionConsumer createDurableConnectionConsumer(Topic topic, 
  -                                                     String subscriptionName,
  -                                                     String messageSelector,
  -                                                     ServerSessionPool sessionPool, 
  -                                                     int maxMessages) throws 
JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The connection is 
closed");                
  -                                                                             
  -             return new SpyConnectionConsumer(this, topic, subscriptionName, 
messageSelector, sessionPool, maxMessages);
  +     public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String 
subscriptionName, String messageSelector, ServerSessionPool sessionPool, int 
maxMessages) throws JMSException {
  +         if (closed)
  +             throw new IllegalStateException("The connection is closed");
  +
  +         SpyTopic t = new SpyTopic((SpyTopic)topic, getClientID(), 
subscriptionName);
  +         return new SpyConnectionConsumer(this, t, true, messageSelector, 
sessionPool, maxMessages);
        }
   
        // Constants -----------------------------------------------------
  @@ -129,7 +126,7 @@
                                                        int maxMessages) throws 
JMSException
        {
                if (closed) throw new IllegalStateException("The connection is 
closed");                                                                              
                  
  -             return new SpyConnectionConsumer(this, queue, null, messageSelector, 
sessionPool, maxMessages);
  +             return new SpyConnectionConsumer(this, queue, true, messageSelector, 
sessionPool, maxMessages);
        }
   
        //Get a queue
  
  
  
  1.5       +5 -3      jbossmq/src/main/org/jbossmq/SpyConnectionConsumer.java
  
  Index: SpyConnectionConsumer.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyConnectionConsumer.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- SpyConnectionConsumer.java        2001/07/16 02:51:44     1.4
  +++ SpyConnectionConsumer.java        2001/07/28 00:30:15     1.5
  @@ -19,7 +19,7 @@
    *      
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.4 $
  + *   @version $Revision: 1.5 $
    */
   public class SpyConnectionConsumer implements javax.jms.ConnectionConsumer, 
SpyConsumer {
   
  @@ -105,10 +105,12 @@
   
        static org.apache.log4j.Category cat = 
org.apache.log4j.Category.getInstance(SpyConnectionConsumer.class);
   
  +
  +
        /**
         * SpyConnectionConsumer constructor comment.
         */
  -     public SpyConnectionConsumer(Connection connection, Destination destination, 
String subscriptionName, String messageSelector, ServerSessionPool serverSessionPool, 
int maxMessages)
  +     public SpyConnectionConsumer(Connection connection, Destination destination, 
boolean actsLikeAQueue, String messageSelector, ServerSessionPool serverSessionPool, 
int maxMessages)
                throws JMSException {
   
                this.connection = connection;
  @@ -118,10 +120,10 @@
   
                subscription.destination = (SpyDestination)destination;
                subscription.messageSelector = messageSelector;
  -             subscription.durableSubscriptionName = subscriptionName;
                subscription.noLocal = false;
                subscription.listening = true;
                subscription.receiving = false;
  +             subscription.actsLikeAQueue = actsLikeAQueue;
   
                connection.addConsumer(this);
                connection.listenerChange(subscription.subscriptionId,true);
  
  
  
  1.2       +1 -1      jbossmq/src/main/org/jbossmq/SpyJMSException.java
  
  Index: SpyJMSException.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyJMSException.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- SpyJMSException.java      2001/07/11 02:23:24     1.1
  +++ SpyJMSException.java      2001/07/28 00:30:15     1.2
  @@ -38,4 +38,4 @@
        super(arg1, arg2);
        setLinkedException(e);
   }
  -}
  +}
  \ No newline at end of file
  
  
  
  1.8       +2 -2      jbossmq/src/main/org/jbossmq/SpyMessage.java
  
  Index: SpyMessage.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyMessage.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- SpyMessage.java   2001/07/16 02:51:44     1.7
  +++ SpyMessage.java   2001/07/28 00:30:15     1.8
  @@ -23,7 +23,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.7 $
  + *   @version $Revision: 1.8 $
    */
   public class SpyMessage 
        implements Serializable, Cloneable, Message, Comparable
  @@ -451,7 +451,7 @@
                        return 1;
                }
                return (int)(messageId - sm.messageId);         
  -     }              
  +     }               
   
        
        public void doAcknowledge() throws JMSException
  
  
  
  1.8       +5 -6      jbossmq/src/main/org/jbossmq/SpyMessageConsumer.java
  
  Index: SpyMessageConsumer.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyMessageConsumer.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- SpyMessageConsumer.java   2001/07/16 02:51:44     1.7
  +++ SpyMessageConsumer.java   2001/07/28 00:30:15     1.8
  @@ -26,7 +26,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.7 $
  + *   @version $Revision: 1.8 $
    */
   public class SpyMessageConsumer implements MessageConsumer, SpyConsumer {
   
  @@ -87,7 +87,7 @@
   
                subscription.receiving = true;
   
  -             if ( this instanceof SpyQueueReceiver || 
subscription.durableSubscriptionName!=null ) 
  +             if ( subscription.actsLikeAQueue ) 
                        session.connection.receive(subscription, 0);
   
                synchronized (messages) {
  @@ -130,7 +130,7 @@
   
                subscription.receiving = true;
   
  -             if ( this instanceof SpyQueueReceiver || 
subscription.durableSubscriptionName!=null ) 
  +             if ( subscription.actsLikeAQueue ) 
                        session.connection.receive(subscription, timeOut);
   
                synchronized (messages) {
  @@ -179,7 +179,7 @@
                subscription.receiving = true;
                try {
   
  -                     if ( this instanceof SpyQueueReceiver || 
subscription.durableSubscriptionName!=null ) {
  +                     if ( subscription.actsLikeAQueue ) {
                                if (session.modeStop)
                                        return null;
   
  @@ -251,8 +251,7 @@
                                return false;
   
                        // Should we NACK the messages??
  -                     if( messageListener==null && !subscription.receiving && 
  -                                     (this instanceof SpyQueueReceiver || 
subscription.durableSubscriptionName!=null) ) {
  +                     if( messageListener==null && !subscription.receiving && 
subscription.actsLikeAQueue ) {
   
                                Message mes = getMessage();
                                while (mes != null ) {
  
  
  
  1.3       +3 -6      jbossmq/src/main/org/jbossmq/SpyQueue.java
  
  Index: SpyQueue.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyQueue.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- SpyQueue.java     2001/03/02 01:12:41     1.2
  +++ SpyQueue.java     2001/07/28 00:30:15     1.3
  @@ -20,7 +20,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class SpyQueue extends SpyDestination implements java.io.Serializable, 
javax.jms.Queue, javax.naming.Referenceable {
        // Constructor ---------------------------------------------------
  @@ -31,16 +31,13 @@
                hash++;
        }
   
  -     // Public --------------------------------------------------------
  -
  -     public String getQueueName() throws JMSException
  -     {
  +     public String getQueueName() {
                return name;
        }
        
        public String toString()
        {
  -             return "Queue@"+name;
  +             return "QUEUE."+name;
        }
        
        // Object override -----------------------------------------------
  
  
  
  1.3       +2 -2      jbossmq/src/main/org/jbossmq/SpyQueueReceiver.java
  
  Index: SpyQueueReceiver.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyQueueReceiver.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- SpyQueueReceiver.java     2001/03/02 01:12:41     1.2
  +++ SpyQueueReceiver.java     2001/07/28 00:30:15     1.3
  @@ -18,7 +18,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class SpyQueueReceiver extends SpyMessageConsumer implements QueueReceiver {
        // Attributes ----------------------------------------------------
  @@ -44,8 +44,8 @@
   
                subscription.destination = (SpyDestination)queue;
                subscription.messageSelector = selector;
  -             subscription.durableSubscriptionName = null;
                subscription.noLocal = false;
  +             subscription.actsLikeAQueue = true;
        }
   
   }
  
  
  
  1.3       +26 -4     jbossmq/src/main/org/jbossmq/SpyTopic.java
  
  Index: SpyTopic.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyTopic.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- SpyTopic.java     2001/03/02 01:12:42     1.2
  +++ SpyTopic.java     2001/07/28 00:30:15     1.3
  @@ -20,7 +20,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class SpyTopic extends SpyDestination implements java.io.Serializable, 
javax.jms.Topic, javax.naming.Referenceable {
        // Constructor ---------------------------------------------------
  @@ -32,14 +32,15 @@
   
        // Public --------------------------------------------------------
   
  -     public String getTopicName() throws JMSException
  -     {
  +     public String getTopicName() {
                return name;
        }
        
        public String toString()
        {
  -             return "Topic@"+name;
  +             if( durableSubscriptionID != null)
  +                     return "TOPIC."+name+"."+durableSubscriptionID;
  +             return "TOPIC."+name;
        }
        
        // Object override -----------------------------------------------
  @@ -59,5 +60,26 @@
                        "org.jbossmq.SpyTopic",
                        new StringRefAddr("name", name),
                        "org.jbossmq.referenceable.SpyDestinationObjectFactory", null);
  +     }
  +
  +     DurableSubcriptionID durableSubscriptionID;
  +
  +     // Constructor ---------------------------------------------------
  +        
  +     public SpyTopic(SpyTopic topic, String clientID, String subscriptionName)
  +     {
  +             this(topic, new DurableSubcriptionID(clientID, subscriptionName));
  +     }
  +
  +     // Constructor ---------------------------------------------------
  +        
  +     public SpyTopic(SpyTopic topic, DurableSubcriptionID subid)
  +     {
  +             super(topic.getTopicName());
  +             this.durableSubscriptionID = subid;
  +     }
  +
  +     public DurableSubcriptionID getDurableSubscriptionID() {
  +             return durableSubscriptionID;
        }
   }
  
  
  
  1.5       +7 -6      jbossmq/src/main/org/jbossmq/SpyTopicSession.java
  
  Index: SpyTopicSession.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyTopicSession.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- SpyTopicSession.java      2001/07/16 02:51:44     1.4
  +++ SpyTopicSession.java      2001/07/28 00:30:15     1.5
  @@ -36,7 +36,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.4 $
  + *   @version $Revision: 1.5 $
    */
   public class SpyTopicSession 
        extends SpySession 
  @@ -59,7 +59,6 @@
        public TopicSubscriber createSubscriber(Topic topic) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
  -                                                                             
                return createSubscriber(topic,null,false);
        }
   
  @@ -67,7 +66,7 @@
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                
  -             SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,noLocal, 
messageSelector, null);
  +             SpyTopicSubscriber sub=new 
SpyTopicSubscriber(this,(SpyTopic)topic,noLocal, messageSelector);
                addConsumer(sub);
   
                return sub;
  @@ -76,8 +75,9 @@
        public TopicSubscriber createDurableSubscriber(Topic topic, String name) 
throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
  -                             
  -             SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,false, null, 
name);
  +
  +             SpyTopic t = new SpyTopic((SpyTopic)topic, connection.getClientID(), 
name);
  +             SpyTopicSubscriber sub=new SpyTopicSubscriber(this,t,false, null);
                addConsumer(sub);
   
                return sub;
  @@ -88,7 +88,8 @@
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                
  -             SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,noLocal, 
messageSelector, name);
  +             SpyTopic t = new SpyTopic((SpyTopic)topic, connection.getClientID(), 
name);
  +             SpyTopicSubscriber sub=new SpyTopicSubscriber(this,t,noLocal, 
messageSelector);
                addConsumer(sub);
   
                return sub;
  
  
  
  1.3       +6 -4      jbossmq/src/main/org/jbossmq/SpyTopicSubscriber.java
  
  Index: SpyTopicSubscriber.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyTopicSubscriber.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- SpyTopicSubscriber.java   2001/03/02 01:12:42     1.2
  +++ SpyTopicSubscriber.java   2001/07/28 00:30:15     1.3
  @@ -22,7 +22,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class SpyTopicSubscriber 
        extends SpyMessageConsumer 
  @@ -31,7 +31,7 @@
        // Attributes ----------------------------------------------------
   
        //The topic I registered
  -     private Topic topic;
  +     private SpyTopic topic;
   
        // Public --------------------------------------------------------
   
  @@ -47,16 +47,18 @@
                return subscription.noLocal;
        }
        
  +
  +
        // Constructor ---------------------------------------------------
           
  -     SpyTopicSubscriber(SpyTopicSession session,Topic topic,boolean noLocal, String 
selector, String durableSubscriptionName) 
  +     SpyTopicSubscriber(SpyTopicSession session,SpyTopic topic,boolean noLocal, 
String selector) 
        {
                super(session, false);
                this.topic=topic;
   
                subscription.destination = (SpyDestination)topic;
                subscription.messageSelector = selector;
  -             subscription.durableSubscriptionName = durableSubscriptionName;
                subscription.noLocal = noLocal;
  +             subscription.actsLikeAQueue = topic.getDurableSubscriptionID() != null;
        }
   }
  
  
  
  1.5       +20 -10    jbossmq/src/main/org/jbossmq/Subscription.java
  
  Index: Subscription.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/Subscription.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- Subscription.java 2001/07/16 02:51:44     1.4
  +++ Subscription.java 2001/07/28 00:30:15     1.5
  @@ -18,7 +18,7 @@
    *      
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.4 $
  + *   @version $Revision: 1.5 $
    */
   public class Subscription
        implements Serializable
  @@ -32,25 +32,22 @@
        // Should this message destroy the subscription?
        public boolean destroyDurableSubscription;
   
  -     // this is not null if we want a durable subscription
  -     public String durableSubscriptionName;
  +
        // Topics might not want locally produced messages
        public boolean noLocal;
   
        // Transient Values 
  -     private transient Selector selector;
  +     public transient Selector selector;
        public transient ConnectionToken dc;
        public transient boolean listening;
        public transient boolean receiving;
   
        // Determines the consumer would accept the message.    
        public boolean accepts( SpyMessage message, boolean exclusive ) throws 
javax.jms.JMSException {
  -
  -             if( messageSelector != null ) {
  -                     if( selector==null ) 
  -                             selector = new Selector(messageSelector);
   
  -                     if( !selector.test(message) )
  +             Selector ms = getSelector();
  +             if( ms != null ) {
  +                     if( !ms.test(message) )
                                return false;
                }
   
  @@ -61,7 +58,7 @@
                                return false;
   
                        // But if the subscriber is durable, then it acts like a Queue
  -                     if( durableSubscriptionName != null ) {
  +                     if( actsLikeAQueue ) {
                                
                                if( !exclusive )
                                        return false;
  @@ -84,4 +81,17 @@
                
        }
   
  +     public boolean actsLikeAQueue;
  +
  +     // Determines the consumer would accept the message.    
  +     public Selector getSelector() throws javax.jms.JMSException {
  +
  +             if( messageSelector == null )
  +                     return null;
  +                     
  +             if( selector==null ) 
  +                     selector = new Selector(messageSelector);
  +
  +             return selector;        
  +     }
   }
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to