User: hiram   
  Date: 00/12/23 17:55:08

  Modified:    src/java/org/spydermq/server SharedQueue.java
                        PersistenceManager.java JMSServer.java
                        JMSDestination.java ExclusiveQueue.java
                        ClientConsumer.java
  Added:       src/java/org/spydermq/server BasicQueue.java
  Log:
  ConnectionConsumer fixes and server synchronization optimizations.
  Spyder should now work with the ASF implementation Peter did.
  
  Revision  Changes    Path
  1.2       +3 -62     spyderMQ/src/java/org/spydermq/server/SharedQueue.java
  
  Index: SharedQueue.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/SharedQueue.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- SharedQueue.java  2000/12/23 15:48:25     1.1
  +++ SharedQueue.java  2000/12/24 01:55:06     1.2
  @@ -26,9 +26,9 @@
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
  -public class SharedQueue implements Task, AbstractQueue {
  +public class SharedQueue extends BasicQueue {
   
        //List of Pending messages
        private TreeSet messages;
  @@ -40,77 +40,18 @@
        // Constructor ---------------------------------------------------         
        SharedQueue(JMSServer server) throws JMSException
        {
  -             this.server=server;
  +             super( server );
                consumers=new LinkedList();
                messages=new TreeSet();                 
        }
   
  -     public void addMessage(SpyMessage mes, Long txId) throws JMSException
  -     {
  -
  -             // This task gets run to make the message visible in the queue.
  -             class AddMessagePostCommitTask implements Runnable {
  -                     SpyMessage message;
  -                     
  -                     AddMessagePostCommitTask(SpyMessage m) {
  -                             message = m;
  -                     }
  -                     
  -                     public void run() {
  -                             synchronized (messages) 
  -                             {
  -
  -                                     //Add the message to the queue
  -                                     messages.add(message);                  
  -                                     notifyMessageAvailable();
  -                                     
  -                             }                                       
  -                     }
  -             }
  -             
  -             // The message gets added to the queue after the transaction
  -             // commits (if the message was transacted)      
  -             Runnable task = new AddMessagePostCommitTask(mes);
  -             if( txId == null ) {
  -                     task.run();
  -             } else {
  -                     server.persistenceManager.addPostCommitTask(txId, task);
  -             }
  -             
  -     }
  -     
  -     // Package protected ---------------------------------------------          
  -     public void addConsumer(ClientConsumer consumer) throws JMSException
  -     {
  -             //We want to avoid removeSubscriber, addSubscriber or sendOneMessage 
to work concurently
  -             synchronized (consumers) {
  -                     consumers.add(consumer);
  -             }
  -     }
  -
  -     public void notifyMessageAvailable() {
  -             
  -             synchronized (server.taskQueue) {
  -                     server.taskQueue.addLast(this);
  -                     server.taskQueue.notify();
  -             }
  -             
  -     }
  -
  -     public void removeConsumer(ClientConsumer consumer) throws JMSException
  -     {
  -             synchronized (consumers) {
  -                     consumers.remove(consumer);
  -             }
  -     }
   
        // This will dispatch messages in the queue the the ClientConsumers
  -     synchronized public void run() throws JMSException 
  +     public void run() throws JMSException 
        {       
                SpyMessage[] job;
                
                synchronized (messages) {
  -                     
                        if( messages.size() == 0 )
                                return;
                                
  
  
  
  1.4       +0 -0      spyderMQ/src/java/org/spydermq/server/PersistenceManager.java
  
  Index: PersistenceManager.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/PersistenceManager.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- PersistenceManager.java   2000/12/23 15:48:24     1.3
  +++ PersistenceManager.java   2000/12/24 01:55:06     1.4
  @@ -26,7 +26,7 @@
    *
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.3 $
  + *   @version $Revision: 1.4 $
    */
   public class PersistenceManager {
   
  
  
  
  1.6       +2 -2      spyderMQ/src/java/org/spydermq/server/JMSServer.java
  
  Index: JMSServer.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSServer.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- JMSServer.java    2000/12/23 15:48:24     1.5
  +++ JMSServer.java    2000/12/24 01:55:07     1.6
  @@ -27,7 +27,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.5 $
  + *   @version $Revision: 1.6 $
    */
   public class JMSServer 
                implements Runnable, JMSServerMBean
  @@ -497,10 +497,10 @@
                
        }
        
  -     public void restoreMessage(SpyMessage message, String queueId) throws 
JMSException
  +     public void restoreMessage(SpyMessage message, String queueId) 
        {
                JMSDestination 
queue=(JMSDestination)messageQueue.get(message.jmsDestination);
  -             if (queue==null) throw new JMSException("This destination does not 
exist !");   
  +             if (queue==null) throw new RuntimeException("This destination does not 
exist!");
                //Add the message to the queue
                queue.restoreMessage(message, queueId);
        }
  
  
  
  1.2       +3 -6      spyderMQ/src/java/org/spydermq/server/JMSDestination.java
  
  Index: JMSDestination.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSDestination.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- JMSDestination.java       2000/12/23 15:48:25     1.1
  +++ JMSDestination.java       2000/12/24 01:55:07     1.2
  @@ -26,7 +26,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public class JMSDestination {
   
  @@ -128,13 +128,10 @@
        }
   
        // Package protected ---------------------------------------------          
  -     ExclusiveQueue getExclusiveQueue(String queue) throws JMSException {
  +     ExclusiveQueue getExclusiveQueue(String queue) {
                
  -             ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get( queue );
  -             if( eq == null )
  -                     throw new JMSException("That destination queue does not 
exist");
  +             return (ExclusiveQueue)exclusiveQueues.get( queue );
                
  -             return eq;
        }
   
        // Package protected ---------------------------------------------          
  @@ -169,7 +166,7 @@
        }
   
        //Used to put a message that was added previously to the queue, back in the 
queue
  -     public void restoreMessage(SpyMessage mes, String queueId) throws JMSException
  +     public void restoreMessage(SpyMessage mes, String queueId) 
        {
                Log.log(""+this+"->restoreMessage(mes="+mes+",queue="+queueId+")");
                ExclusiveQueue eq = getExclusiveQueue(queueId);         
  
  
  
  1.2       +5 -119    spyderMQ/src/java/org/spydermq/server/ExclusiveQueue.java
  
  Index: ExclusiveQueue.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/ExclusiveQueue.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- ExclusiveQueue.java       2000/12/23 15:48:25     1.1
  +++ ExclusiveQueue.java       2000/12/24 01:55:07     1.2
  @@ -28,143 +28,29 @@
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
  -public class ExclusiveQueue implements Task, AbstractQueue {
  +public class ExclusiveQueue extends BasicQueue {
   
  -     //List of messages waiting to be dispatched
  -     TreeSet messages = new TreeSet();
  -     //The JMSServer object
  -     JMSServer server;
  -     //DistributedConnection objs that have "registered" to this Destination
  -     private LinkedList consumers = new LinkedList();
        //The queueId needed to identify this queue with the persistence manager.
        String queueId;
  -
  -             
  -     //Used to put a message that was added previously to the queue, back in the 
queue
  -     public void restoreMessage(SpyMessage mes) 
  -     {
  -             //restore a message to the message list...
  -             synchronized (messages) {
  -                     messages.add(mes);      
  -             }
  -             notifyMessageAvailable();
  -     }       
        
        public void addMessage(SpyMessage mes, Long txId) throws JMSException
        {
  -             Log.log(""+this+"->addMessage(mes="+mes+",txId="+txId+")");
   
  -             // This task gets run to make the message visible in the queue.
  -             class AddMessagePostCommitTask implements Runnable {
  -                     SpyMessage message;
  -                     
  -                     AddMessagePostCommitTask(SpyMessage m) {
  -                             message = m;
  -                     }
  -                     
  -                     public void run() {
  -                             //restore a message to the message list...
  -                             synchronized (messages) {
  -                                     messages.add(message);  
  -                             }
  -                             notifyMessageAvailable();
  -                     }
  -             }
  -             
                // Persist the message if it was persistent
                if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ) 
                        server.persistenceManager.add(queueId, mes, txId);
  -
  -             // The message gets added to the queue after the transaction
  -             // commits (if the message was transacted)      
  -             Runnable task = new AddMessagePostCommitTask(mes);
  -             if( txId == null ) {
  -                     task.run();
  -             } else {
  -                     server.persistenceManager.addPostCommitTask(txId, task);
  -             }
  +                     
  +             super.addMessage(mes, txId);
                
        }
        
        // Constructor ---------------------------------------------------         
        public ExclusiveQueue(JMSServer server, String queueId) throws JMSException
  -     {
  -             
  -             this.server=server;
  -             this.queueId = queueId;
  -                                     
  -     }
  -
  -     // synchrnozed so no message dispatching occurs while we add a consumer
  -     synchronized public void addConsumer(ClientConsumer consumer) throws 
JMSException
  -     {
  -             //We want to avoid removeSubscriber, addSubscriber or sendOneMessage 
to work concurently
  -             synchronized (consumers) {
  -                     consumers.add(consumer);
  -             }
  -     }
  -
  -     public SpyMessage[] browse(String selector) throws JMSException {
  -                     
  -             if( selector == null ) {
  -                     SpyMessage list[];
  -                     synchronized (messages) {
  -                             list = new SpyMessage[messages.size()];
  -                             list = (SpyMessage [])messages.toArray(list);
  -                     }
  -                     return list;
  -             } else {
  -                     Selector s = new Selector( selector );
  -                     LinkedList selection=new LinkedList();
  -                     
  -                     synchronized (messages) {
  -                             Iterator i = messages.iterator();
  -                             while( i.hasNext() ) {
  -                                     SpyMessage m = (SpyMessage)i.next();
  -                                     if( s.test(m) )
  -                                             selection.add(m);
  -                             }
  -                     }
  -                     
  -                     SpyMessage list[];
  -                     list = new SpyMessage[selection.size()];
  -                     list = (SpyMessage [])selection.toArray(list);
  -                     return list;                    
  -             }
  -     }
  -
  -     public void notifyMessageAvailable() {
  -
  -             Log.log(""+this+"->notifyMessageAvailable()");
  -             
  -             synchronized (server.taskQueue) {
  -                     server.taskQueue.addLast(this);
  -                     server.taskQueue.notify();
  -             }
  -             
  -     }
  -
  -     //Used by QueueReceivers for receive(), receive(long wait), and receiveNoWait()
  -     public SpyMessage receiveMessage() throws  JMSException
  -     {
  -             synchronized (messages) {
  -                     if (messages.size()==0) 
  -                             return null;
  -                             
  -                     SpyMessage m = (SpyMessage)messages.first();
  -                     messages.remove(m);
  -                     
  -                     return m;
  -             }
  -     }
  -
  -     public void removeConsumer(ClientConsumer consumer) throws JMSException
        {
  -             synchronized (consumers) {
  -                     consumers.remove(consumer);
  -             }
  +             super(server);
  +             this.queueId = queueId;                                 
        }
   
        // Iterate over the consumers asking them to take messages until they stop
  
  
  
  1.2       +72 -58    spyderMQ/src/java/org/spydermq/server/ClientConsumer.java
  
  Index: ClientConsumer.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/ClientConsumer.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- ClientConsumer.java       2000/12/23 15:48:25     1.1
  +++ ClientConsumer.java       2000/12/24 01:55:07     1.2
  @@ -27,7 +27,7 @@
    *
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public class ClientConsumer implements Task {
   
  @@ -67,15 +67,14 @@
                        RestoreMessageTask(SpyMessage m,int subscriptionId) { message 
= m; this.subscriptionId=subscriptionId; }
                        public void run() {
                                Log.log("Restoring message: " + message.jmsMessageID);
  -                             String queueId = JMSDestination.DEFAULT_QUEUE_ID;
  +                             String queueId;
                                if( message.jmsDestination instanceof SpyTopic ) {
                                        // Still need to implement
  -                                     //queueId 
  -                             }
  -                             try {
  -                                     server.restoreMessage(message,queueId);
  -                             } catch (JMSException ignore ) {
  +                                     queueId = null;
  +                             } else {
  +                                     queueId = JMSDestination.DEFAULT_QUEUE_ID;
                                }
  +                             server.restoreMessage(message,queueId);
                        }
                }               
                
  @@ -124,14 +123,13 @@
                        Subscription s = (Subscription)subs.next();
                        if( s.accepts( message, false ) ) {
                        
  +                             ReceiveRequest r = new ReceiveRequest();
  +                             r.message = message;
  +                             
                                synchronized (messages) {
  -
  -                                     ReceiveRequest r = new ReceiveRequest();
  -                                     r.message = message;
  -                                     
                                        messages.add(r);
  -                                     
                                }
  +                             
                                return;
                        }
                }
  @@ -146,18 +144,24 @@
                req.dc = dc;
                
                synchronized (subscriptions ) {
  -
  -                     subscriptions.put(new Integer(req.subscriptionId), req );
  +                     
  +                     HashMap subscriptionsClone = (HashMap)subscriptions.clone();
  +                     subscriptionsClone.put(new Integer(req.subscriptionId), req );
  +                     subscriptions = subscriptionsClone;
                                
                        LinkedList ll = (LinkedList)destinationSubscriptions.get( 
req.destination );
                        if( ll == null ) {
  -                             ll = new LinkedList();
  -                             
  -                             destinationSubscriptions.put(req.destination, ll );
  -                             
  +
                                JMSDestination 
queue=(JMSDestination)server.getJMSDestination(req.destination);
                                if (queue==null) throw new JMSException("This 
destination does not exist !");
                                
  +                             ll = new LinkedList();
  +                             ll.add( req );
  +                             
  +                             HashMap destinationSubscriptionsClone = 
(HashMap)destinationSubscriptions.clone();
  +                             destinationSubscriptionsClone.put(req.destination, ll 
);
  +                             destinationSubscriptions = 
destinationSubscriptionsClone;
  +                                                             
                                if( queue.isTopic ) {
                                        if( req.durableSubscriptionName!=null ) {
                                                // 
queue.addExclusiveConsumer(dc.getClientID(), this);
  @@ -167,9 +171,14 @@
                                } else  {
                                        
queue.addExclusiveConsumer(queue.DEFAULT_QUEUE_ID, this);
                                }
  +                     } else {
  +                             LinkedList llClone = (LinkedList)ll.clone();
  +                             llClone.add( req );
  +                             
  +                             HashMap destinationSubscriptionsClone = 
(HashMap)destinationSubscriptions.clone();
  +                             destinationSubscriptions.put(req.destination, llClone);
  +                             destinationSubscriptions = 
destinationSubscriptionsClone;
                        }
  -
  -                     ll.add( req );
                }                       
        }
   
  @@ -191,7 +200,7 @@
                }
                
                synchronized (unacknowledgedMessages) {
  -                     Iterator i = unacknowledgedMessages.keySet().iterator();
  +                     Iterator i = 
((HashMap)unacknowledgedMessages.clone()).keySet().iterator();
                        while( i.hasNext() ) {
                                
                                AcknowledgementRequest item = 
(AcknowledgementRequest)i.next();
  @@ -292,17 +301,26 @@
                Subscription req;
                synchronized (subscriptions ) {
   
  -                     req = (Subscription)subscriptions.remove(new 
Integer(subscriptionId));
  -                     
  +                     HashMap subscriptionsClone = (HashMap)subscriptions.clone();
  +                     req = (Subscription)subscriptionsClone.remove(new 
Integer(subscriptionId));
  +                     subscriptions = subscriptionsClone;
  +                             
                        if( req == null )
                                throw new JMSException("The subscription had not been 
previously registered");
  +
                                
                        LinkedList ll = (LinkedList)destinationSubscriptions.get( 
req.destination );
                        if( ll == null ) 
                                throw new JMSException("The subscription was not 
registered with the destination");
  -                             
  -                     ll.remove( req );
  -                     if( ll.size() != 0 )
  +
  +                     LinkedList llClone = (LinkedList)ll.clone();
  +                     llClone.remove( req );
  +
  +                     HashMap destinationSubscriptionsClone = 
(HashMap)destinationSubscriptions.clone();
  +                     destinationSubscriptionsClone.put( req.destination, llClone );
  +                     destinationSubscriptions = destinationSubscriptionsClone;
  +                                                     
  +                     if( llClone.size() != 0 )
                                return;
                        
                        // There is no subscriber for the destination at this point    
 
  @@ -358,47 +376,43 @@
                while( i.hasNext() ) {
                        
                        SpyMessage message = (SpyMessage)i.next();
  +                     
  +                     LinkedList l = (LinkedList)destinationSubscriptions.get( 
message.getJMSDestination() );
  +                     if( l == null ) return false;
   
  -                     synchronized (subscriptions) {
  +                     Iterator subs = l.iterator();                   
  +                     while(  subs.hasNext() ) {
                                
  -                             LinkedList l = 
(LinkedList)destinationSubscriptions.get( message.getJMSDestination() );
  -                             if( l == null )
  -                                     throw new JMSException("No subscription found 
for that destination.");
  +                             Subscription s = (Subscription)subs.next();
  +                             if( s.accepts( message, true ) ) {
   
  -                             Iterator subs = l.iterator();
  -                             
  -                             while(  subs.hasNext() ) {
  +                                     s.receiving = false;
  +                                     i.remove();
                                        
  -                                     Subscription s = (Subscription)subs.next();
  -                                     if( s.accepts( message, true ) ) {
  -
  -                                             s.receiving = false;
  -                                             i.remove();
  -                                             
  -                                             synchronized (messages) {
  -
  -                                                     ReceiveRequest r = new 
ReceiveRequest();
  -                                                     r.message = message;
  -                                                     r.subscriptionId = new 
Integer(s.subscriptionId);
  -                                                     
  -                                                     messages.add(r);
  -                                                     
  -                                                     AcknowledgementRequest ack = 
new AcknowledgementRequest();
  -                                                     ack.destination = 
message.getJMSDestination();
  -                                                     ack.messageID = 
message.getJMSMessageID();
  -                                                     ack.subscriberId = 
s.subscriptionId;
  -                                                     ack.isAck = false;
  -                                                     
unacknowledgedMessages.put(ack, message);
  -                                                     
  -                                             }
  -                                             notifyMessageAvailable();
  +                                     ReceiveRequest r = new ReceiveRequest();
  +                                     r.message = message;
  +                                     r.subscriptionId = new 
Integer(s.subscriptionId);
  +                                                                             
  +                                     synchronized (messages) {
  +                                             messages.add(r);                       
                         
  +                                     }
  +                                     
  +                                     AcknowledgementRequest ack = new 
AcknowledgementRequest();
  +                                     ack.destination = message.getJMSDestination();
  +                                     ack.messageID = message.getJMSMessageID();
  +                                     ack.subscriberId = s.subscriptionId;
  +                                     ack.isAck = false;
   
  -                                             return true;
  -                                             
  +                                     synchronized (unacknowledgedMessages) {
  +                                             unacknowledgedMessages.put(ack, 
message);
                                        }
  +                                     
  +                                     notifyMessageAvailable();
  +                                     return true;
  +                                     
                                }
  -                             
                        }
  +                             
                }
                
                return false;
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/server/BasicQueue.java
  
  Index: BasicQueue.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.server;
  
  import javax.jms.Destination;
  import javax.jms.JMSException;
  import javax.jms.DeliveryMode;
  
  import org.spydermq.*;
  import org.spydermq.persistence.SpyMessageLog;
  import org.spydermq.selectors.Selector;
  
  import java.util.Iterator;
  import java.util.Hashtable;
  import java.util.LinkedList;
  import java.util.HashMap;
  import java.util.TreeSet;
  
  
  /**
   *    This class represents a queue which provides it's messages
   *  exclusivly to one consumer at a time.
   *
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  abstract public class BasicQueue implements Task, AbstractQueue {
  
        //List of messages waiting to be dispatched
        TreeSet messages = new TreeSet();
        //The JMSServer object
        JMSServer server;
        //DistributedConnection objs that have "registered" to this Destination
        LinkedList consumers = new LinkedList();
  
  
        // Constructor ---------------------------------------------------         
        public BasicQueue(JMSServer server) throws JMSException
        {
                
                this.server=server;
                                        
        }
                
        //Used to put a message that was added previously to the queue, back in the 
queue
        public void restoreMessage(SpyMessage mes) 
        {
                //restore a message to the message list...
                synchronized (messages) {
                        messages.add(mes);      
                }
                notifyMessageAvailable();
        }       
        
        public void addMessage(SpyMessage mes, Long txId) throws JMSException
        {
                Log.log(""+this+"->addMessage(mes="+mes+",txId="+txId+")");
  
                // This task gets run to make the message visible in the queue.
                class AddMessagePostCommitTask implements Runnable {
                        SpyMessage message;
                        
                        AddMessagePostCommitTask(SpyMessage m) {
                                message = m;
                        }
                        
                        public void run() {
                                //restore a message to the message list...
                                synchronized (messages) {
                                        messages.add(message);  
                                }
                                notifyMessageAvailable();
                        }
                }
                
                // The message gets added to the queue after the transaction
                // commits (if the message was transacted)      
                Runnable task = new AddMessagePostCommitTask(mes);
                if( txId == null ) {
                        task.run();
                } else {
                        server.persistenceManager.addPostCommitTask(txId, task);
                }
                
        }
  
        // 
        public void addConsumer(ClientConsumer consumer) throws JMSException
        {
                //We want to avoid removeSubscriber, addSubscriber or sendOneMessage 
to work concurently
                synchronized (consumers) {
                        LinkedList consumersClone = (LinkedList)consumers.clone();
                        consumersClone.add(consumer);
                        consumers = consumersClone;
                }
        }
  
        public SpyMessage[] browse(String selector) throws JMSException {
                        
                if( selector == null ) {
                        SpyMessage list[];
                        synchronized (messages) {
                                list = new SpyMessage[messages.size()];
                                list = (SpyMessage [])messages.toArray(list);
                        }
                        return list;
                } else {
                        Selector s = new Selector( selector );
                        LinkedList selection=new LinkedList();
                        
                        synchronized (messages) {
                                Iterator i = messages.iterator();
                                while( i.hasNext() ) {
                                        SpyMessage m = (SpyMessage)i.next();
                                        if( s.test(m) )
                                                selection.add(m);
                                }
                        }
                        
                        SpyMessage list[];
                        list = new SpyMessage[selection.size()];
                        list = (SpyMessage [])selection.toArray(list);
                        return list;                    
                }
        }
  
        public void notifyMessageAvailable() {
  
                Log.log(""+this+"->notifyMessageAvailable()");
                
                synchronized (server.taskQueue) {
                        server.taskQueue.addLast(this);
                        server.taskQueue.notify();
                }
                
        }
  
        //Used by QueueReceivers for receive(), receive(long wait), and receiveNoWait()
        public SpyMessage receiveMessage() throws  JMSException
        {
                synchronized (messages) {
                        if (messages.size()==0) 
                                return null;
                                
                        SpyMessage m = (SpyMessage)messages.first();
                        messages.remove(m);
                        
                        return m;
                }
        }
  
        public void removeConsumer(ClientConsumer consumer) throws JMSException
        {
                synchronized (consumers) {
                        LinkedList consumersClone = (LinkedList)consumers.clone();
                        consumersClone.remove(consumer);
                        consumers = consumersClone;
                }
        }
  
  }
  
  
  

Reply via email to