User: hiram   
  Date: 00/12/23 07:48:18

  Modified:    src/java/org/spydermq SpyQueueReceiver.java
                        SpyConnection.java SpyConnectionConsumer.java
                        SpyQueueSession.java SpyDistributedConnection.java
                        SpyMessageConsumer.java SpyQueueConnection.java
                        SpyQueueBrowser.java SpyConsumer.java
                        SpySession.java SpyMessage.java
                        SpyTopicSession.java SpyTopicSubscriber.java
                        SpyXAResource.java SpyXAResourceManager.java
                        TransactionRequest.java
  Added:       src/java/org/spydermq AcknowledgementRequest.java
                        ReceiveRequest.java Subscription.java
  Removed:     src/java/org/spydermq SpyAcknowledgementItem.java
  Log:
  These changes were done to add the following features:
  The selector is now evaluated at the server side.
  The infrastructure has been laid for durable topic subscriptions.
  The QueueBrowser has been implemented.
  Queues now can have a Selector.
  
  Revision  Changes    Path
  1.8       +19 -33    spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java
  
  Index: SpyQueueReceiver.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- SpyQueueReceiver.java     2000/12/21 22:33:55     1.7
  +++ SpyQueueReceiver.java     2000/12/23 15:48:14     1.8
  @@ -18,24 +18,14 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.7 $
  + *   @version $Revision: 1.8 $
    */
   public class SpyQueueReceiver extends SpyMessageConsumer implements QueueReceiver {
        // Attributes ----------------------------------------------------
   
        //The queue I registered
        private Queue queue;
  -     //Mode of this QueueReceiver
  -     boolean listening;
   
  -     // Constructor ---------------------------------------------------
  -
  -     SpyQueueReceiver(SpyQueueSession session, Queue queue) {
  -             super(session, (SpyQueue) queue);
  -             this.queue = queue;
  -             listening = false;
  -     }
  -
        // Public --------------------------------------------------------
   
        public Queue getQueue() throws JMSException {
  @@ -45,26 +35,8 @@
                return queue;
        }
   
  -     public void close() throws JMSException {
  -
  -             synchronized (messages) {
  -                     if (closed)
  -                             return;
  -
  -                     if (queue != null)
  -                             session.removeConsumer(queue, this);
   
  -                     setListening(false);
   
  -                     if (waitInReceive && messageListener == null) {
  -                             //A consumer could be waiting in receive()
  -                             messages.notify();
  -                     }
  -
  -                     closed = true;
  -             }
  -     }
  -
        public void setMessageListener(MessageListener listener) throws JMSException {
                super.setMessageListener(listener);
                setListening(listener != null);
  @@ -72,15 +44,29 @@
   
        //---   
        void setListening(boolean newvalue) throws JMSException {
  -             if (newvalue == listening)
  +             if (newvalue == subscription.listening)
                        return;
  -             listening = newvalue;
  +             subscription.listening = newvalue;
   
                if (queue != null)
  -                     session.getConnection().listenerChange(queue);
  +                     session.connection.listenerChange(subscription.subscriptionId, 
subscription.listening);
        }
  +
  +     // Constructor ---------------------------------------------------
  +
  +     SpyQueueReceiver(SpyQueueSession session, Queue queue, String selector) {
  +             super(session, (SpyQueue) queue);
  +             this.queue = queue;
  +             
  +             subscription.durableSubscriptionName = null;
  +             subscription.messageSelector = selector;
  +             subscription.durableSubscriptionName = null;
  +             subscription.noLocal = false;
  +     }
  +
  +
   
  -     public boolean isListening() {
  -             return listening;
  +     public Subscription getSubscription() {
  +             return subscription;
        }
   }
  
  
  
  1.19      +141 -173  spyderMQ/src/java/org/spydermq/SpyConnection.java
  
  Index: SpyConnection.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyConnection.java,v
  retrieving revision 1.18
  retrieving revision 1.19
  diff -u -r1.18 -r1.19
  --- SpyConnection.java        2000/12/21 22:33:55     1.18
  +++ SpyConnection.java        2000/12/23 15:48:14     1.19
  @@ -27,13 +27,15 @@
   import org.spydermq.distributed.interfaces.DistributedJMSServer;
   import org.spydermq.distributed.interfaces.ConnectionReceiver;
   
  +import java.util.LinkedList;
  +
   /**
    *   This class implements javax.jms.Connection
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.18 $
  + *   @version $Revision: 1.19 $
    */
   public class SpyConnection implements Connection, Serializable {
   
  @@ -47,10 +49,15 @@
        protected String clientID;
        //the distributed object which receives messages from the JMS server
        protected SpyDistributedConnection distributedConnection;
  -     //HashMap of ConnectionQueue by Destination
  -     public HashMap destinations;
  +
        //LinkedList of all created sessions by this connection 
        HashSet createdSessions;
  +     // Numbers subscriptions
  +     int subscriptionCounter = Integer.MIN_VALUE;
  +     //Maps a destination to a LinkedList of Subscriptions
  +     public HashMap destinationSubscriptions = new HashMap();
  +     //Maps a a subsction id to a Subscription
  +     public HashMap subscriptions = new HashMap();
        //Last message ID returned
        private int lastMessageID;
        //Is the connection stopped ?
  @@ -64,6 +71,7 @@
        // Used to control tranactions
        SpyXAResourceManager spyXAResourceManager;
   
  +
        //////////////////////////////////////////////////////////////
        // Constructors
        //////////////////////////////////////////////////////////////
  @@ -71,7 +79,6 @@
        SpyConnection(DistributedJMSServer theServer, String cID, String crCN) throws 
JMSException {
                //Set the attributes
                provider = theServer;
  -             destinations = new HashMap();
                createdSessions = new HashSet();
                distributedConnection = null;
                closed = false;
  @@ -145,20 +152,11 @@
                if (!modeStop)
                        return;
                modeStop = false;
  -
  -             Iterator i = destinations.keySet().iterator();
  -             while (i.hasNext()) {
  -                     Destination d = (Destination) i.next();
  -                     ConsumerSet ci = (ConsumerSet) destinations.get(d);
  -
  -                     if (ci.getLasListeningState()) {
  -                             try {
  -                                     
provider.connectionListening(distributedConnection, true, d);
  -                             } catch (Exception e) {
  -                                     failureHandler(e, "Cannot contact the JMS 
server");
  -                             }
  -                     }
   
  +             try {
  +                     provider.setEnabled(distributedConnection, true);
  +             } catch (Exception e) {
  +                     failureHandler(e, "Cannot enable the connection with the JMS 
provider");
                }
   
                changeModeStop(modeStop);
  @@ -174,19 +172,10 @@
                        return;
                modeStop = true;
   
  -             Iterator i = destinations.keySet().iterator();
  -             while (i.hasNext()) {
  -                     Destination d = (Destination) i.next();
  -                     ConsumerSet ci = (ConsumerSet) destinations.get(d);
  -
  -                     if (ci.getLasListeningState()) {
  -                             try {
  -                                     
provider.connectionListening(distributedConnection, false, d);
  -                             } catch (Exception e) {
  -                                     failureHandler(e, "Cannot contact the JMS 
server");
  -                             }
  -                     }
  -
  +             try {
  +                     provider.setEnabled(distributedConnection, false);
  +             } catch (Exception e) {
  +                     failureHandler(e, "Cannot disable the connection with the JMS 
provider");
                }
   
                changeModeStop(modeStop);
  @@ -240,10 +229,8 @@
                try {
   
                        //Remove it from the destinations list
  -                     synchronized (destinations) {
  -                             HashMap newMap = (HashMap) destinations.clone();
  -                             newMap.remove(dest);
  -                             destinations = newMap;
  +                     synchronized (subscriptions) {
  +                             destinationSubscriptions.remove(dest);
                        }
   
                        //Notify its sessions that this TemporaryDestination is going 
to be deleted()
  @@ -326,85 +313,11 @@
                }
        }
   
  -     // The ConsumerSet inner class is used by:
  -     //
  -     //              addConsumer()
  -     //      removeConsumer()
  -     //      getConsumers()
  -     //      listenerChange()
  -     //              pickListeningConsumer()
  -     //
  -     class ConsumerSet extends HashSet {
  -             boolean lasListeningState = false;
  -
  -             boolean getLasListeningState() {
  -                     return lasListeningState;
  -             }
  -
  -             boolean listenStateChanged() {
  -                     boolean t = false;
  -
  -                     Iterator iter = iterator();
  -                     while (iter.hasNext()) {
  -                             SpyConsumer c = (SpyConsumer) iter.next();
  -                             if (c.isListening()) {
  -                                     t = true;
  -                                     break;
  -                             }
  -                     }
  -
  -                     if (t == lasListeningState) {
  -                             return false;
  -                     }
  -
  -                     lasListeningState = t;
  -                     return true;
  -             }
  -     }
  -
  -     /**
  -      * Called whenever a consumer changes his listening state on a destination.
  -      * We see if the consumer change, changed the overall listening state for the 
destination.
  -      * Creation date: (11/16/2000 2:20:22 PM)
  -      * @return org.spydermq.distributed.interfaces.DistributedJMSServer
  -      */
  -     public void listenerChange(Destination d) throws JMSException {
   
  -             if (closed)
  -                     throw new IllegalStateException("The connection is closed");
  -             if (distributedConnection == null)
  -                     createReceiver();
   
  -             ConsumerSet ci = (ConsumerSet) destinations.get(d);
  -             
  -             if( ci == null ) 
  -                     return;                 
  -             if (ci.listenStateChanged()) {
  -                     try {
  -                             if (ci.getLasListeningState()) {
  -                                     
provider.connectionListening(distributedConnection, true, d);
  -                             } else {
  -                                     
provider.connectionListening(distributedConnection, false, d);
  -                             }
  -                     } catch (Exception e) {
  -                             failureHandler(e, "Cannot contact the JMS server");
  -                     }
  -             }
   
  -     }
   
  -     /**
  -      * @return org.spydermq.distributed.interfaces.DistributedJMSServer
  -      */
  -     SpyMessage queueReceive(Queue queue, long wait) throws JMSException {
   
  -             try {
  -                     return provider.queueReceive(distributedConnection, queue, 
wait);
  -             } catch (Exception e) {
  -                     failureHandler(e, "Cannot create a ConnectionReceiver");
  -                     return null;
  -             }
  -     }
   
        ////////////////////////////////////////////////////////////////////
        // Protected
  @@ -428,14 +341,7 @@
                }
        }
   
  -     // used to acknowledge a message
  -     protected void send(SpyAcknowledgementItem item) throws JMSException {
  -             try {
  -                     provider.acknowledge(distributedConnection, item);
  -             } catch (Exception e) {
  -                     failureHandler(e, "Cannot acknowlege a message.");
  -             }
  -     }
  +
   
        //Send a message to the provider
        void sendToServer(SpyMessage mes) throws JMSException {
  @@ -464,115 +370,177 @@
   
        }
   
  +
        //A new Consumer has been created for the Destination dest
  -     void addConsumer(Destination dest, SpyConsumer consumer) throws JMSException {
  +     void addConsumer(SpyConsumer consumer) throws JMSException {
                if (closed)
                        throw new IllegalStateException("The connection is closed");
                if (distributedConnection == null)
                        createReceiver();
   
  -             Log.log("Connection: addConsumer(dest=" + dest.toString() + ")");
  +             Subscription req = consumer.getSubscription();
  +             req.subscriptionId = subscriptionCounter++;
  +             req.dc = distributedConnection;
   
  -             try {
  +             Log.log("Connection: addConsumer(dest=" + req.destination.toString() + 
")");
   
  -                     synchronized (destinations) {
  +             
  +             try {
   
  -                             ConsumerSet consumerSet = (ConsumerSet) 
destinations.get(dest);
  +                     synchronized (subscriptions ) {
   
  -                             if (consumerSet == null) {
  -                                     consumerSet = new ConsumerSet();
  -                                     consumerSet.add(consumer);
  -                                     HashMap newDestinations = (HashMap) 
destinations.clone();
  -                                     newDestinations.put(dest, consumerSet);
  -                                     destinations = newDestinations;
  -                                     provider.subscribe(distributedConnection, 
dest);
  -                             } else {
  -                                     consumerSet.add(consumer);
  +                             subscriptions.put(new Integer(req.subscriptionId), 
consumer );
  +                                     
  +                             LinkedList ll = 
(LinkedList)destinationSubscriptions.get( req.destination );
  +                             if( ll == null ) {
  +                                     ll = new LinkedList();
  +                                     destinationSubscriptions.put(req.destination, 
ll );
                                }
  +
  +                             ll.add( consumer );
                        }
   
  +                     provider.subscribe(distributedConnection, req);
  +                     
                } catch (Exception e) {
                        failureHandler(e, "Cannot subscribe to this Destination");
                }
   
        }
   
  -     //Gets all the consumers subscribed to a destination
  -     public SpyConsumer[] getConsumers(Destination dest) throws JMSException {
  +     /**
  +      * @return org.spydermq.distributed.interfaces.DistributedJMSServer
  +      */
  +     SpyMessage[] browse(Queue queue, String selector) throws JMSException {
   
  -             if (closed)
  -                     throw new IllegalStateException("The connection is closed");
  -             if (distributedConnection == null)
  -                     createReceiver();
  +             try {
  +                     return provider.browse(distributedConnection, queue, selector);
  +             } catch (Exception e) {
  +                     failureHandler(e, "Cannot browse the Queue.");
  +                     return null;
  +             }
  +     }
   
  -             synchronized (destinations) {
  -                     ConsumerSet consumerSet = (ConsumerSet) destinations.get(dest);
  -                     if (consumerSet == null || consumerSet.size() == 0)
  -                             return null;
  +     //Gets the first consumer that is listening to a destination.   
  +     public void deliver(ReceiveRequest requests[]) throws JMSException {
   
  -                     SpyConsumer rc[] = new SpyConsumer[consumerSet.size()];
  -                     return (SpyConsumer[]) consumerSet.toArray(rc);
  -             }
  +             HashSet consumersUsed = new HashSet();
  +
  +             for( int i=0; i < requests.length; i++ ) {
  +
  +                     if( requests[i].subscriptionId != null ) {
  +
  +                             SpyConsumer consumer = (SpyConsumer)subscriptions.get( 
requests[i].subscriptionId );
  +                             if( consumer == null ) {
  +                                     send( 
requests[i].message.getAcknowledgementRequest(false) );
  +                                     Log.log("WARNING: NACK issued due to non 
existent subscription");
  +                                     continue;
  +                             }
   
  +                             requests[i].message.shouldAck = true;
  +                             consumer.addMessage(requests[i].message);
  +                             consumersUsed.add(consumer);
  +                             
  +                     } else {                                        
  +                             
  +                             LinkedList ll = 
(LinkedList)destinationSubscriptions.get( requests[i].message.getJMSDestination() );
  +                             if( ll == null ) {
  +                                     Log.log("WARNING: Received message but had no 
subscribers for it");
  +                                     continue;
  +                             }
  +                             
  +                             requests[i].message.shouldAck = false;
  +                             Iterator iter = ll.iterator();
  +                             while( iter.hasNext() ) {
  +                                     
  +                                     SpyConsumer consumer = 
(SpyConsumer)iter.next();
  +                                     consumer.addMessage(requests[i].message);
  +                                     consumersUsed.add(consumer);
  +
  +                             }                               
  +                     }                                               
  +             }
  +
  +             Iterator iter = consumersUsed.iterator();
  +             while( iter.hasNext() ) {
  +                     SpyConsumer consumer = (SpyConsumer)iter.next();
  +                     consumer.processMessages();
  +             }
        }
   
  -     //Gets the first consumer that is listening to a destination.   
  -     public SpyConsumer pickListeningConsumer(Destination dest) throws JMSException 
{
  +     /**
  +      * Called whenever a consumer changes his listening state on a destination.
  +      * We see if the consumer change, changed the overall listening state for the 
destination.
  +      * Creation date: (11/16/2000 2:20:22 PM)
  +      * @return org.spydermq.distributed.interfaces.DistributedJMSServer
  +      */
  +     public void listenerChange(int subscriptionId, boolean state) throws 
JMSException {
   
                if (closed)
                        throw new IllegalStateException("The connection is closed");
                if (distributedConnection == null)
                        createReceiver();
  -
  -             synchronized (destinations) {
  -
  -                     ConsumerSet consumerSet = (ConsumerSet) destinations.get(dest);
   
  -                     if (consumerSet == null || consumerSet.size() == 0) {
  -                             return null;
  -                     } else {
  -                             Iterator i = consumerSet.iterator();
  -                             while (i.hasNext()) {
  -                                     SpyConsumer c = (SpyConsumer) i.next();
  -                                     if (c.isListening() || c.isReceiving()) {
  -                                             return c;
  -                                     }
  -                             }
  -                     }
  +             try {
  +                     provider.listenerChange(distributedConnection, subscriptionId, 
state);
  +             } catch (Exception e) {
  +                     failureHandler(e, "Cannot contact the JMS server");
                }
   
  -             return null;
  +     }
   
  +     /**
  +      * @return org.spydermq.distributed.interfaces.DistributedJMSServer
  +      */
  +     SpyMessage receive(Subscription sub, long wait) throws JMSException {
  +
  +             try {
  +                     return provider.receive(distributedConnection, 
sub.subscriptionId, wait);
  +             } catch (Exception e) {
  +                     failureHandler(e, "Cannot create a ConnectionReceiver");
  +                     return null;
  +             }
        }
   
        //A consumer does not need to recieve the messages from a Destination 
  -     void removeConsumer(Destination dest, SpyConsumer who) throws JMSException {
  +     void removeConsumer(SpyConsumer consumer) throws JMSException {
   
                if (distributedConnection == null)
                        createReceiver();
   
  -             Log.log("Connection: removeSession(dest=" + dest.toString() + ")");
  +             Subscription req = consumer.getSubscription();
  +             Log.log("Connection: removeSession(dest=" + req.destination + ")");
   
                try {
  -
  -                     synchronized (destinations) {
   
  -                             ConsumerSet consumerSet = (ConsumerSet) 
destinations.get(dest);
  -                             if (consumerSet == null) 
  -                                     throw new RuntimeException("Destination does 
not have any consumers.");
  -                             
  -                             consumerSet.remove(who);
  +                     provider.unsubscribe(distributedConnection, 
req.subscriptionId);
  +                     
  +                     synchronized (subscriptions ) {
   
  -                             if ( consumerSet.isEmpty() ) {
  -                                     destinations.remove(dest);
  -                                     provider.unsubscribe(distributedConnection, 
dest);
  +                             subscriptions.remove(new Integer(req.subscriptionId));
  +                                     
  +                             LinkedList ll = 
(LinkedList)destinationSubscriptions.get( req.destination );
  +                             if( ll != null ) {
  +                                     ll.remove( req );
  +                                     if( ll.size() == 0 ) {
  +                                             
destinationSubscriptions.remove(req.destination);
  +                                     }
                                }
  -
  +                             
                        }
   
                } catch (Exception e) {
                        failureHandler(e, "Cannot unsubscribe to this destination");
                }
   
  +     }
  +
  +     // used to acknowledge a message
  +     protected void send(AcknowledgementRequest item) throws JMSException {
  +             try {
  +                     provider.acknowledge(distributedConnection, item);
  +             } catch (Exception e) {
  +                     failureHandler(e, "Cannot acknowlege a message.");
  +             }
        }
   }
  
  
  
  1.2       +15 -2     spyderMQ/src/java/org/spydermq/SpyConnectionConsumer.java
  
  Index: SpyConnectionConsumer.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyConnectionConsumer.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- SpyConnectionConsumer.java        2000/12/21 22:33:56     1.1
  +++ SpyConnectionConsumer.java        2000/12/23 15:48:15     1.2
  @@ -19,7 +19,7 @@
    *      
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public class SpyConnectionConsumer implements javax.jms.ConnectionConsumer, 
SpyConsumer {
   
  @@ -35,6 +35,8 @@
        LinkedList queue = new LinkedList();
        // Is the ConnectionConsumer closed?
        boolean closed;
  +     // The subscription info the consumer
  +     Subscription subscription;
   
        /**
         * SpyConnectionConsumer constructor comment.
  @@ -47,20 +49,27 @@
                this.serverSessionPool = serverSessionPool;
                this.maxMessages = maxMessages;
   
  -             connection.addConsumer(destination, this);
  +             subscription.destination = (SpyDestination)destination;
  +             subscription.messageSelector = messageSelector;
  +             subscription.durableSubscriptionName = null;
  +             subscription.noLocal = false;
   
  +             connection.addConsumer(this);
  +
        }
   
        public void addMessage(SpyMessage mes) throws JMSException {
                queue.addLast(mes);
        }
   
  +
  +
        /**
         * close method comment.
         */
        public void close() throws javax.jms.JMSException {
   
  -             connection.removeConsumer(destination, this);
  +             connection.removeConsumer(this);
                closed = true;
   
        }
  @@ -109,5 +118,9 @@
   
                }
   
  +     }
  +
  +     public Subscription getSubscription() {
  +             return subscription;
        }
   }
  
  
  
  1.10      +11 -14    spyderMQ/src/java/org/spydermq/SpyQueueSession.java
  
  Index: SpyQueueSession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueSession.java,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -r1.9 -r1.10
  --- SpyQueueSession.java      2000/12/21 22:33:55     1.9
  +++ SpyQueueSession.java      2000/12/23 15:48:15     1.10
  @@ -16,21 +16,20 @@
   import javax.jms.QueueBrowser;
   import javax.jms.DeliveryMode;
   import javax.jms.XAQueueSession;
  +import javax.jms.MessageListener;
   
   import java.util.HashSet;
   import java.util.HashMap;
   import java.util.Iterator;
   
   
  -import javax.jms.MessageListener;
  -
   /**
    *   This class implements javax.jms.QueueSession and javax.jms.XAQueueSession
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.9 $
  + *   @version $Revision: 1.10 $
    */
   public class SpyQueueSession 
        extends SpySession 
  @@ -52,18 +51,14 @@
   
        public QueueBrowser createBrowser(Queue queue) throws JMSException
        {
  -             if (closed) throw new IllegalStateException("The session is closed");  
         
  -                                                                             
  -             //Not yet implemented
  -             return null;
  +             if (closed) throw new IllegalStateException("The session is closed");
  +             return new SpyQueueBrowser(this, queue, null);
        }
   
        public QueueBrowser createBrowser(Queue queue,String messageSelector) throws 
JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
  -                                                                             
  -             //Not yet implemented
  -             return createBrowser(queue);
  +             return new SpyQueueBrowser(this, queue, null);
        }
   
        public Queue createQueue(String queueName) throws JMSException
  @@ -77,7 +72,7 @@
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
   
  -             SpyQueueReceiver receiver=new SpyQueueReceiver(this,queue);
  +             SpyQueueReceiver receiver=new SpyQueueReceiver(this,queue,null);
                addConsumer(queue,receiver);
                
                return receiver;
  @@ -86,9 +81,11 @@
        public QueueReceiver createReceiver(Queue queue, String messageSelector) 
throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
  -                                                                             
  -             //Not yet implemented
  -             return createReceiver(queue);
  +
  +             SpyQueueReceiver receiver=new 
SpyQueueReceiver(this,queue,messageSelector);
  +             addConsumer(queue,receiver);
  +             
  +             return receiver;
        }
   
        public QueueSender createSender(Queue queue) throws JMSException
  @@ -116,7 +113,7 @@
        {
                
                super.setMessageListener(listener);
  -             sessionConsumer = new SpyQueueReceiver(this, null);
  +             sessionConsumer = new SpyQueueReceiver(this, null,null);
   
        }
   }
  
  
  
  1.10      +7 -1      spyderMQ/src/java/org/spydermq/SpyDistributedConnection.java
  
  Index: SpyDistributedConnection.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyDistributedConnection.java,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -r1.9 -r1.10
  --- SpyDistributedConnection.java     2000/12/12 20:58:25     1.9
  +++ SpyDistributedConnection.java     2000/12/23 15:48:15     1.10
  @@ -8,6 +8,7 @@
   
   import java.util.HashMap;
   import java.io.Serializable;
  +
   import org.spydermq.distributed.interfaces.ConnectionReceiver;
   import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
   
  @@ -15,8 +16,9 @@
    *   This class is the broker point of view on a SpyConnection (it contains a 
ConnectionReceiver)
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
  + *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.9 $
  + *   @version $Revision: 1.10 $
    */
   public class SpyDistributedConnection 
        implements Serializable
  @@ -68,5 +70,9 @@
                if (cr!=null && cr instanceof java.rmi.Remote) {
                        
java.rmi.server.UnicastRemoteObject.unexportObject((java.rmi.Remote)cr, true);
                }
  +     }
  +
  +     public String toString() {
  +             return "SpyDistributedConnection:"+clientID;
        }
   }
  
  
  
  1.8       +59 -61    spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java
  
  Index: SpyMessageConsumer.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- SpyMessageConsumer.java   2000/12/21 22:33:55     1.7
  +++ SpyMessageConsumer.java   2000/12/23 15:48:15     1.8
  @@ -24,7 +24,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.7 $
  + *   @version $Revision: 1.8 $
    */
   abstract public class SpyMessageConsumer implements MessageConsumer, SpyConsumer {
   
  @@ -34,19 +34,16 @@
        MessageListener messageListener;
        //Am I closed ?
        protected boolean closed;
  -     //Do I have a selector
  -     public Selector selector;
  -     //The message selector
  -     public String messageSelector;
  +     // The subscription structure should be fill out by the decendent
  +     Subscription subscription = new Subscription();
  +
        //List of Pending messages (not yet delivered)
        LinkedList messages;
  -     //Is the consumer sleeping in a receive() ?
  -     boolean waitInReceive;
  +
        //The destination this consumer is getting messages from
        SpyDestination destination;
  -     //Am I in noLocal mode ?
  -     boolean noLocal;
   
  +
        // Constructor ---------------------------------------------------
   
        SpyMessageConsumer(SpySession s, SpyDestination dest) {
  @@ -54,10 +51,7 @@
                destination = dest;
                messageListener = null;
                closed = false;
  -             selector = null;
  -             messageSelector = null;
                messages = new LinkedList();
  -             waitInReceive = false;
        }
   
        // Public --------------------------------------------------------
  @@ -66,7 +60,7 @@
                if (closed)
                        throw new IllegalStateException("The MessageConsumer is 
closed");
   
  -             return messageSelector;
  +             return subscription.messageSelector;
        }
   
        public MessageListener getMessageListener() throws JMSException {
  @@ -79,12 +73,11 @@
        public void setMessageListener(MessageListener listener) throws JMSException {
                if (closed)
                        throw new IllegalStateException("The MessageConsumer is 
closed");
  -             if (waitInReceive)
  +             if (subscription.receiving)
                        throw new JMSException("This MessageConsumer is waiting in 
receive() !");
   
                messageListener = listener;
   
  -             //session.run();                
        }
   
        public Message receive() throws JMSException {
  @@ -96,10 +89,12 @@
                if (destination == null)
                        throw new JMSException("No assigned destination.");
   
  -             waitInReceive = true;
  +             Log.log("Subscription="+subscription);
  +             subscription.receiving = true;
  +             Log.log("Subscription assignment finished");
   
  -             if (!isListening() && this instanceof SpyQueueReceiver)
  -                     session.connection.queueReceive((SpyQueue) destination, 0);
  +             if ( this instanceof SpyQueueReceiver || 
subscription.durableSubscriptionName!=null ) 
  +                     session.connection.receive(subscription, 0);
   
                synchronized (messages) {
   
  @@ -122,7 +117,7 @@
                                newE.setLinkedException(e);
                                throw newE;
                        } finally {
  -                             waitInReceive = false;
  +                             subscription.receiving = false;
                        }
                }
   
  @@ -141,10 +136,10 @@
   
                long endTime = System.currentTimeMillis() + timeOut;
   
  -             waitInReceive = true;
  +             subscription.receiving = true;
   
  -             if (!isListening() && this instanceof SpyQueueReceiver)
  -                     session.connection.queueReceive((SpyQueue) destination, 
timeOut);
  +             if ( this instanceof SpyQueueReceiver || 
subscription.durableSubscriptionName!=null ) 
  +                     session.connection.receive(subscription, timeOut);
   
                synchronized (messages) {
   
  @@ -177,7 +172,7 @@
                                newE.setLinkedException(e);
                                throw newE;
                        } finally {
  -                             waitInReceive = false;
  +                             subscription.receiving = false;
                        }
                }
   
  @@ -191,14 +186,13 @@
                if (destination == null)
                        throw new JMSException("No assigned destination.");
   
  -             waitInReceive = true;
  +             subscription.receiving = true;
                try {
  -
  -                     if (!isListening() && this instanceof SpyQueueReceiver) {
   
  +                     if ( this instanceof SpyQueueReceiver || 
subscription.durableSubscriptionName!=null ) {
                                if (session.modeStop)
                                        return null;
  -                             return session.connection.queueReceive((SpyQueue) 
destination, -1);
  +                             return session.connection.receive(getSubscription(), 
-1);
                        }
   
                        synchronized (messages) {
  @@ -210,19 +204,31 @@
                        }
   
                } finally {
  -                     waitInReceive = false;
  +                     subscription.receiving = false;
                }
        }
   
  -     abstract public void close() throws JMSException;
  -
  -     //Package protected - Not part of the spec
  +     public void close() throws JMSException {
  +             
  +             synchronized (messages) {
  +                     if (closed)
  +                             return;
   
  -     void setSelector(Selector selector, String messageSelector) {
  -             this.selector = selector;
  -             this.messageSelector = messageSelector;
  +                     if (destination != null)
  +                             session.removeConsumer(destination, this);
  +                                             
  +                     if ( subscription.receiving && messageListener == null) {
  +                             //A consumer could be waiting in receive()
  +                             messages.notify();
  +                     }
  +                     
  +                     closed = true;
  +             }
  +             
        }
   
  +
  +
        SpyMessage getMessage() {
                synchronized (messages) {
   
  @@ -239,28 +245,14 @@
                                                continue;
                                        }
   
  -                                     if (selector != null) {
  -                                             if (!selector.test(mes)) {
  -                                                     Log.log("SessionQueue: I 
dropped a message (selector)");
  -                                                     continue;
  -                                             } else {
  -                                                     Log.log("SessionQueue: 
selector evaluates TRUE");
  -                                             }
  -                                     }
  -
  -                                     if (noLocal && 
mes.producerClientId.equals(session.connection.clientID)) {
  -                                             Log.notice("SessionQueue: I dropped a 
message (noLocal)");
  -                                             continue;
  -                                     }
  -
                                        //the SAME Message object is put in different 
SessionQueues
                                        //when we deliver it, we have to clone() it to 
insure independance
                                        SpyMessage message = mes.myClone();
  -                                     message.setSpySession(session);
  +                                     message.session = session;
   
  -                                     if (session.transacted) {
  +                                     if (message.shouldAck && session.transacted) {
                                                
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId, 
message);
  -                                     } else if (session.acknowledgeMode == 
session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE) {
  +                                     } else if (message.shouldAck && 
session.acknowledgeMode == session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == 
session.DUPS_OK_ACKNOWLEDGE) {
                                                message.doAcknowledge();
                                        }
   
  @@ -279,7 +271,15 @@
        public void addMessage(SpyMessage mes) throws JMSException {
                synchronized (messages) {
                        //Add a message to the queue
  -                     messages.addLast(mes);
  +                     if( subscription.accepts(mes, mes.shouldAck) ) {
  +                             messages.addLast(mes);
  +                     } else {
  +                             if( mes.shouldAck ) {
  +                                     Log.log("WARNING: NACK issued. The 
subscription did not accept the message");
  +                                     session.connection.send( 
mes.getAcknowledgementRequest(false) );
  +                             }
  +                     }
  +                     
                }
        }
   
  @@ -290,7 +290,7 @@
                                return false;
   
                        if (messageListener == null) {
  -                             if (!waitInReceive) {
  +                             if (!subscription.receiving) {
   
                                        // If no Listener and No reciver is waiting 
for a message
                                        // Then we neg ack the message back to the 
server in the queue case.
  @@ -300,9 +300,9 @@
                                                while (mes == null) {
   
                                                        Log.log("Got unrequested 
message, sending NACK for: " + mes);
  -                                                     SpyAcknowledgementItem item = 
new SpyAcknowledgementItem();
  -                                                     item.jmsDestination = 
mes.getJMSDestination();
  -                                                     item.jmsMessageID = 
mes.getJMSMessageID();
  +                                                     AcknowledgementRequest item = 
new AcknowledgementRequest();
  +                                                     item.destination = 
mes.getJMSDestination();
  +                                                     item.messageID = 
mes.getJMSMessageID();
                                                        item.isAck = false;
   
                                                        session.connection.send(item);
  @@ -334,13 +334,11 @@
                return true;
        }
   
  -     abstract public boolean isListening();
  -
  -     public boolean isReceiving() {
  -             return waitInReceive;
  -     }
  -
        public void processMessages() throws JMSException {
                session.mutex.notifyLock();
  +     }
  +
  +     public Subscription getSubscription() {
  +             return subscription;
        }
   }
  
  
  
  1.4       +0 -0      spyderMQ/src/java/org/spydermq/SpyQueueConnection.java
  
  Index: SpyQueueConnection.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueConnection.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- SpyQueueConnection.java   2000/12/21 22:33:55     1.3
  +++ SpyQueueConnection.java   2000/12/23 15:48:15     1.4
  @@ -24,7 +24,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.3 $
  + *   @version $Revision: 1.4 $
    */
   public class SpyQueueConnection 
        extends SpyConnection 
  
  
  
  1.3       +29 -9     spyderMQ/src/java/org/spydermq/SpyQueueBrowser.java
  
  Index: SpyQueueBrowser.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueBrowser.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- SpyQueueBrowser.java      2000/12/12 05:58:58     1.2
  +++ SpyQueueBrowser.java      2000/12/23 15:48:15     1.3
  @@ -9,42 +9,62 @@
   import javax.jms.QueueBrowser;
   import javax.jms.Queue;
   import javax.jms.JMSException;
  +
   import java.util.Enumeration;
  +import java.util.LinkedList;
  +import java.util.Vector;
   
   /**
    *   This class implements javax.jms.QueueBrowser
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class SpyQueueBrowser 
        implements QueueBrowser 
   {
  +
  +     boolean closed;
  +     // The destination this browser will browse messages from
  +     Queue destination;
  +     // String Selector
  +     String selector;
  +     // The QueueSession this was created with
  +     SpyQueueSession session;
   
  -     //Public
  +     SpyQueueBrowser( SpyQueueSession session,Queue destination,String selector ) {
  +             this.destination=destination;
  +             this.session=session;
  +             this.selector=selector;
  +     }
   
        public Queue getQueue() throws JMSException
        {
  -             //Nor implemented yet
  -             return null;
  +             return destination;
        }
        
        public String getMessageSelector() throws JMSException
        {
  -             //Nor implemented yet
  -             return null;
  +             return selector;
        }
        
        public Enumeration getEnumeration() throws JMSException
        {
  -             //Nor implemented yet
  -             return null;
  +             if( closed )
  +                     throw new JMSException("The QueueBrowser was closed");
  +                     
  +             SpyMessage data[] = session.connection.browse(destination, selector);
  +             Vector v = new Vector( data.length );
  +             for( int i=0; i < data.length; i++ ) {
  +                     v.addElement(data[i]);
  +             }
  +             return v.elements();
        }
        
        public void close() throws JMSException
        {
  -             //Nor implemented yet
  +             closed = true;
                return;
        }       
   }
  
  
  
  1.2       +2 -4      spyderMQ/src/java/org/spydermq/SpyConsumer.java
  
  Index: SpyConsumer.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyConsumer.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- SpyConsumer.java  2000/12/21 22:33:56     1.1
  +++ SpyConsumer.java  2000/12/23 15:48:15     1.2
  @@ -14,16 +14,14 @@
    *      
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public interface SpyConsumer 
   {
        // A ConnectionReceiver uses this method to load a Consumer with a message
        public void addMessage(SpyMessage mes) throws JMSException;
  -     // This is used the Connection class (it maintains a list of consumers) to see 
who is receiving messages
  -     public boolean isListening();
  -     // This is used the Connection class (it maintains a list of consumers) to see 
who is receiving messages
  -     public boolean isReceiving();
        // This is called by a ConnectionReceiver after it is finished loading 
messages into the consumer.
        public void processMessages() throws JMSException;
  +     // This is used to know what type of messages the consumer wants
  +     public Subscription getSubscription();
   }
  
  
  
  1.17      +5 -9      spyderMQ/src/java/org/spydermq/SpySession.java
  
  Index: SpySession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpySession.java,v
  retrieving revision 1.16
  retrieving revision 1.17
  diff -u -r1.16 -r1.17
  --- SpySession.java   2000/12/21 22:33:55     1.16
  +++ SpySession.java   2000/12/23 15:48:15     1.17
  @@ -34,7 +34,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.16 $
  + *   @version $Revision: 1.17 $
    */
   abstract public class SpySession 
        implements Runnable, Session, XASession
  @@ -48,7 +48,7 @@
        //The messageListener for this session
        private MessageListener messageListener;
        //The connection object to which this session is linked
  -     protected SpyConnection connection;
  +     public SpyConnection connection;
        // This consumer is the consumer that receives messages for the MessageListener
        // assigned to the session.  The SpyConnectionConsumer delivers messages to him
        SpyMessageConsumer sessionConsumer;
  @@ -213,7 +213,7 @@
        {
                synchronized (runLock) {
                                
  -                     Log.log("SpySession: Message delivery started");
  +                     Log.log("SpySession: run()");
                        
                        boolean done=false;     
                        while (!done) {
  @@ -256,8 +256,6 @@
                        }
                
                }
  -             
  -             Log.log("SpySession: Message delivery ended");
        }
   
        public synchronized void close() throws JMSException
  @@ -389,7 +387,7 @@
                Log.log("Session: 
removeConsumer(Destination="+dest.toString()+",MessageConsumer="+who.toString()+")");
   
                synchronized (connection) {                     
  -                     connection.removeConsumer(dest, who );          
  +                     connection.removeConsumer( who );
                }
                
                consumers.remove( who );
  @@ -405,14 +403,12 @@
                        consumers.add( who );                   
                }
   
  -             connection.addConsumer(dest, who);
  +             connection.addConsumer(who);
                
        }
   
        
  -     public SpyConnection getConnection() {
  -             return connection;
  -     }
  +
   
        
        //called by a MessageProducer object which needs to publish a message
  
  
  
  1.10      +32 -41    spyderMQ/src/java/org/spydermq/SpyMessage.java
  
  Index: SpyMessage.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessage.java,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -r1.9 -r1.10
  --- SpyMessage.java   2000/12/13 15:59:09     1.9
  +++ SpyMessage.java   2000/12/23 15:48:15     1.10
  @@ -6,7 +6,6 @@
    */
   package org.spydermq;
   
  -
   import javax.jms.Message;
   import javax.jms.JMSException;
   import javax.jms.MessageFormatException;
  @@ -23,7 +22,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.9 $
  + *   @version $Revision: 1.10 $
    */
   public class SpyMessage 
        implements Serializable, Cloneable, Message, Comparable
  @@ -39,30 +38,31 @@
        //Header fields 
        //Set by send() method
        public Destination jmsDestination=null;
  -     private int jmsDeliveryMode=-1;
  -     private long jmsExpiration=0;
  -     private int jmsPriority=-1;
  -     private String jmsMessageID=null;
  -     private long jmsTimeStamp=0;
  +     public int jmsDeliveryMode=-1;
  +     public long jmsExpiration=0;
  +     public int jmsPriority=-1;
  +     public String jmsMessageID=null;
  +     public long jmsTimeStamp=0;
        //Set by the client
  -     private boolean jmsCorrelationID=true;
  -     private String jmsCorrelationIDString=null; 
  -     private byte[] jmsCorrelationIDbyte=null;
  -     private Destination jmsReplyTo=null;
  -     private String jmsType=null;
  +     public boolean jmsCorrelationID=true;
  +     public String jmsCorrelationIDString=null; 
  +     public byte[] jmsCorrelationIDbyte=null;
  +     public Destination jmsReplyTo=null;
  +     public String jmsType=null;
        //Set by the provider
  -     private boolean jmsRedelivered=false;
  +     public boolean jmsRedelivered=false;
        //Properties
  -     private Hashtable prop;
  -     private boolean propReadWrite;
  +     public Hashtable prop;
  +     public boolean propReadWrite;
        //Message body
  -     protected boolean msgReadOnly=false;
  +     public boolean msgReadOnly=false;
        //For noLocal to be able to tell if this was a locally produced message
        public String producerClientId;
        
        // Transient Attributes ------------------------------------------
        //For acknowledgment (set on the client side)
  -     private transient SpySession spySession;
  +     public transient SpySession session;
  +     public transient boolean shouldAck;
        //For ordering in the JMSServerQueue (set on the server side)
        public transient long messageId;
        
  @@ -72,7 +72,6 @@
        {
                prop=new Hashtable();
                propReadWrite=true;
  -             spySession=null;
        }       
   
        // Public --------------------------------------------------------
  @@ -402,10 +401,10 @@
        
        public void acknowledge() throws JMSException
        {
  -             if (spySession==null) 
  +             if (session==null) 
                        throw new JMSException("This message was not recieved from the 
provider");
                        
  -             if( spySession.acknowledgeMode == spySession.CLIENT_ACKNOWLEDGE )
  +             if( session.acknowledgeMode == session.CLIENT_ACKNOWLEDGE )
                        doAcknowledge();
                
        }
  @@ -451,39 +450,31 @@
                        return 1;
                }
                return (int)(messageId - sm.messageId);         
  -     }     
  +     }       
   
        
        public void doAcknowledge() throws JMSException
        {
  -             
  -             SpyAcknowledgementItem item = new SpyAcknowledgementItem();
  -             item.jmsDestination = jmsDestination;
  -             item.jmsMessageID = jmsMessageID;
  -             item.isAck = true;
  -             
  -             spySession.getConnection().send(item);
  +             if( shouldAck )
  +                     session.connection.send(getAcknowledgementRequest(true));
                
        }       
        
        
        public void doNegAcknowledge() throws JMSException
        {
  -             SpyAcknowledgementItem item = new SpyAcknowledgementItem();
  -             item.jmsDestination = jmsDestination;
  -             item.jmsMessageID = jmsMessageID;
  -             item.isAck = false;
  -             
  -             spySession.getConnection().send(item);
  +             if( shouldAck )
  +                     session.connection.send(getAcknowledgementRequest(false));
        }
   
  +     public AcknowledgementRequest getAcknowledgementRequest(boolean isAck) throws 
JMSException
  +     {
                
  -     public SpySession getSpySession() {
  -             return spySession;
  -     }
  -
  -     
  -     public void setSpySession(SpySession newSpySession) {
  -             spySession = newSpySession;
  +             AcknowledgementRequest item = new AcknowledgementRequest();
  +             item.destination = jmsDestination;
  +             item.messageID = jmsMessageID;
  +             item.isAck = isAck;
  +             return item;            
  +             
        }
   }
  
  
  
  1.12      +14 -16    spyderMQ/src/java/org/spydermq/SpyTopicSession.java
  
  Index: SpyTopicSession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSession.java,v
  retrieving revision 1.11
  retrieving revision 1.12
  diff -u -r1.11 -r1.12
  --- SpyTopicSession.java      2000/12/21 22:33:55     1.11
  +++ SpyTopicSession.java      2000/12/23 15:48:15     1.12
  @@ -13,6 +13,7 @@
   import javax.jms.JMSException;
   import javax.jms.TopicPublisher;
   import javax.jms.TemporaryTopic;
  +import javax.jms.MessageListener;
   import javax.jms.XATopicSession;
   
   import java.util.Collection;
  @@ -26,16 +27,13 @@
   import org.spydermq.selectors.Selector; 
   import org.spydermq.Log;
   
  -
  -import javax.jms.MessageListener;
  -
   /**
    *   This class implements javax.jms.TopicSession and javax.jms.XATopicSession
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.11 $
  + *   @version $Revision: 1.12 $
    */
   public class SpyTopicSession 
        extends SpySession 
  @@ -74,13 +72,8 @@
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                
  -             SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,noLocal);
  +             SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,noLocal, 
messageSelector, null);
                addConsumer(topic,sub);
  -             
  -             if (messageSelector!=null) {
  -                     Selector selector=new Selector(messageSelector);        
  -                     sub.setSelector(selector,messageSelector);
  -             }
   
                return sub;
        }
  @@ -88,17 +81,22 @@
        public TopicSubscriber createDurableSubscriber(Topic topic, String name) 
throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
  -                                                                             
  -             //Not yet implemented
  -             return createSubscriber(topic);
  +                             
  +             SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,false, null, 
name);
  +             addConsumer(topic,sub);
  +
  +             return sub;
  +
        }
   
        public TopicSubscriber createDurableSubscriber(Topic topic, String name, 
String messageSelector, boolean noLocal) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
  -                                                                             
  -             //Not yet implemented
  -             return createSubscriber(topic);
  +                             
  +             SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,noLocal, 
messageSelector, name);
  +             addConsumer(topic,sub);
  +
  +             return sub;
        }
   
        public TopicPublisher createPublisher(Topic topic) throws JMSException
  @@ -130,7 +128,7 @@
        {
                
                super.setMessageListener(listener);
  -             sessionConsumer = new SpyTopicSubscriber(this, null, false);
  +             sessionConsumer = new SpyTopicSubscriber(this, null, false,null,null);
   
        }
   }
  
  
  
  1.9       +10 -36    spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java
  
  Index: SpyTopicSubscriber.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- SpyTopicSubscriber.java   2000/12/21 22:33:56     1.8
  +++ SpyTopicSubscriber.java   2000/12/23 15:48:16     1.9
  @@ -20,7 +20,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.8 $
  + *   @version $Revision: 1.9 $
    */
   public class SpyTopicSubscriber 
        extends SpyMessageConsumer 
  @@ -31,16 +31,6 @@
        //The topic I registered
        private Topic topic;
   
  -
  -     // Constructor ---------------------------------------------------
  -        
  -     SpyTopicSubscriber(SpyTopicSession session,Topic topic,boolean noLocal) 
  -     {
  -             super(session, (SpyTopic)topic);
  -             this.topic=topic;
  -             this.noLocal=noLocal;
  -     }
  -
        // Public --------------------------------------------------------
   
        public Topic getTopic() throws JMSException
  @@ -52,35 +42,19 @@
        public boolean getNoLocal() throws JMSException
        {
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");                   
  -             return noLocal;
  +             return subscription.noLocal;
        }
        
  -     //Overrides MessageConsumer
  -
  -     public void close() throws JMSException
  +     // Constructor ---------------------------------------------------
  +        
  +     SpyTopicSubscriber(SpyTopicSession session,Topic topic,boolean noLocal, String 
selector, String durableSubscriptionName) 
        {
  -
  -             synchronized (messages) {
  -                     if (closed)
  -                             return;
  +             super(session, (SpyTopic)topic);
  +             this.topic=topic;
   
  -                     if (topic != null)
  -                             session.removeConsumer(topic, this);
  -                                             
  -                     if (waitInReceive && messageListener == null) {
  -                             //A consumer could be waiting in receive()
  -                             messages.notify();
  -                     }
  -                     
  -                     closed = true;
  -             }
  -             
  -     }
  -     
  -     /**
  -      * A topic is allways accepting messages from a destination.
  -      */
  -     public boolean isListening() {
  -             return true;
  +             subscription.destination = (SpyDestination)topic;
  +             subscription.messageSelector = selector;
  +             subscription.durableSubscriptionName = durableSubscriptionName;
  +             subscription.noLocal = noLocal;
        }
   }
  
  
  
  1.3       +0 -0      spyderMQ/src/java/org/spydermq/SpyXAResource.java
  
  Index: SpyXAResource.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyXAResource.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- SpyXAResource.java        2000/12/21 22:33:56     1.2
  +++ SpyXAResource.java        2000/12/23 15:48:16     1.3
  @@ -16,7 +16,7 @@
    *      
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class SpyXAResource implements XAResource {
   
  
  
  
  1.2       +9 -9      spyderMQ/src/java/org/spydermq/SpyXAResourceManager.java
  
  Index: SpyXAResourceManager.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyXAResourceManager.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- SpyXAResourceManager.java 2000/12/19 06:43:32     1.1
  +++ SpyXAResourceManager.java 2000/12/23 15:48:16     1.2
  @@ -19,7 +19,7 @@
    *
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public class SpyXAResourceManager implements java.io.Serializable {
   
  @@ -65,9 +65,9 @@
                TXState state = (TXState) transactions.get(xid);
                if (state == null)
                        throw new JMSException("Invalid transaction id.");
  -             SpyAcknowledgementItem item = new SpyAcknowledgementItem();
  -             item.jmsDestination = msg.getJMSDestination();
  -             item.jmsMessageID = msg.getJMSMessageID();
  +             AcknowledgementRequest item = new AcknowledgementRequest();
  +             item.destination = msg.getJMSDestination();
  +             item.messageID = msg.getJMSMessageID();
                item.isAck = true;
                state.ackedMessages.addLast(item);
        }
  @@ -93,8 +93,8 @@
                                transaction.messages = job;
                        }
                        if (state.ackedMessages.size() != 0) {
  -                             SpyAcknowledgementItem job[] = new 
SpyAcknowledgementItem[state.ackedMessages.size()];
  -                             job = (SpyAcknowledgementItem[]) 
state.ackedMessages.toArray(job);
  +                             AcknowledgementRequest job[] = new 
AcknowledgementRequest[state.ackedMessages.size()];
  +                             job = (AcknowledgementRequest[]) 
state.ackedMessages.toArray(job);
                                transaction.acks = job;
                        }
                        connection.send(transaction);
  @@ -134,8 +134,8 @@
                        transaction.messages = job;
                }
                if (state.ackedMessages.size() != 0) {
  -                     SpyAcknowledgementItem job[] = new 
SpyAcknowledgementItem[state.ackedMessages.size()];
  -                     job = (SpyAcknowledgementItem[]) 
state.ackedMessages.toArray(job);
  +                     AcknowledgementRequest job[] = new 
AcknowledgementRequest[state.ackedMessages.size()];
  +                     job = (AcknowledgementRequest[]) 
state.ackedMessages.toArray(job);
                        transaction.acks = job;
                }
                connection.send(transaction);
  @@ -158,8 +158,8 @@
                        transaction.requestType = transaction.ONE_PHASE_COMMIT_REQUEST;
                        transaction.xid = null;
                        if (state.ackedMessages.size() != 0) {
  -                             SpyAcknowledgementItem job[] = new 
SpyAcknowledgementItem[state.ackedMessages.size()];
  -                             job = (SpyAcknowledgementItem[]) 
state.ackedMessages.toArray(job);
  +                             AcknowledgementRequest job[] = new 
AcknowledgementRequest[state.ackedMessages.size()];
  +                             job = (AcknowledgementRequest[]) 
state.ackedMessages.toArray(job);
                                transaction.acks = job;
                                //Neg Acknowlege all consumed messages
                                for (int i = 0; i < transaction.acks.length; i++) {
  
  
  
  1.2       +1 -1      spyderMQ/src/java/org/spydermq/TransactionRequest.java
  
  Index: TransactionRequest.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/TransactionRequest.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- TransactionRequest.java   2000/12/19 06:43:33     1.1
  +++ TransactionRequest.java   2000/12/23 15:48:16     1.2
  @@ -13,7 +13,7 @@
    *      
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public class TransactionRequest 
        implements Serializable
  @@ -34,6 +34,6 @@
        public SpyMessage[] messages;
        
        // messages acknowleged in the transaction
  -     public SpyAcknowledgementItem[] acks;   
  +     public AcknowledgementRequest[] acks;   
   
   }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/AcknowledgementRequest.java
  
  Index: AcknowledgementRequest.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import java.io.Serializable;
  import javax.jms.Destination;
  
  /**
   * Used to Acknowledge sent messages.
   *
   * This class holds the minimum abount of information needed to
   * identify a message to the JMSServer.
   *
   * @author Hiram Chirino ([EMAIL PROTECTED])
   *
   * @version $Revision: 1.1 $
   */
  public class AcknowledgementRequest
        implements java.io.Serializable
  {
  
        public boolean isAck;
        public Destination destination=null;
        public String messageID=null;
        transient public int subscriberId;
  
        public boolean equals(Object o) {
                
                if( !(o instanceof AcknowledgementRequest ) )
                        return false;
                        
                return messageID.equals(((AcknowledgementRequest)o).messageID) &&
                        destination.equals(((AcknowledgementRequest)o).destination);
                        
        }
  
        public int hashCode() {
                return messageID.hashCode();
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/ReceiveRequest.java
  
  Index: ReceiveRequest.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import java.io.Serializable;
  
  /**
   *    This class contians all the data needed to perform a JMS transaction
   *      
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class ReceiveRequest 
        implements Serializable
  {
        // The message
        public SpyMessage message;
        // Is this an exlusive message? Then subscriptionId != null
        public Integer subscriptionId;
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/Subscription.java
  
  Index: Subscription.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import java.io.Serializable;
  
  import org.spydermq.selectors.Selector;
  
  /**
   *    This class contians all the data needed to for a the provider to
   *  to determine if a message can be routed to a consumer.
   *      
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class Subscription
        implements Serializable
  {
        // This gets set to a unique value at the SpyConnection
        public int subscriptionId;
        // the queue we want to subscribe to
        public SpyDestination destination;
        // the selector which will filter out messages
        public String messageSelector;
  
        // this is not null if we want a durable subscription
        public String durableSubscriptionName;
        // Topics might not want locally produced messages
        public boolean noLocal;
  
        // Transient Values 
        private transient Selector selector;
        public transient SpyDistributedConnection dc;
        public transient boolean listening;
        public transient boolean receiving;
  
        // Determines the consumer would accept the message.    
        public boolean accepts( SpyMessage message, boolean exclusive ) throws 
javax.jms.JMSException {
  
                if( messageSelector != null ) {
                        if( selector==null ) 
                                selector = new Selector(messageSelector);
  
                        if( !selector.test(message) )
                                return false;
                }
  
                if ( message.getJMSDestination() instanceof SpyTopic ) {
  
                        // In the Topic case we allways deliver unless we have a 
noLocal
                        if( noLocal &&  message.producerClientId.equals( 
dc.getClientID() ) )
                                return false;
  
                        // But if the subscriber is durable, then it acts like a Queue
                        if( durableSubscriptionName != null ) {
                                
                                if( !exclusive )
                                        return false;
                                if( !listening || !receiving )
                                        return false;
                                        
                        }
                                
                } else {
                        
                        if( !exclusive )
                                return false;                           
                        // In the Queue case we only deliver if it is currently
                        // has a listner or is receiving
                        if( !listening && !receiving )
                                return false;
                }
        
                return true;
                
        }
  }
  
  
  

Reply via email to