User: pkendall
  Date: 01/08/14 19:43:33

  Modified:    src/main/org/jboss/mq/server JMSTopic.java JMSServer.java
                        JMSQueue.java JMSDestination.java
                        ClientConsumer.java BasicQueue.java
  Log:
  move subscription from receivers list to blocked list when connection is stopped.  
Should stop messages from being delivered after connection is stopped.
  
  Revision  Changes    Path
  1.2       +92 -80    jbossmq/src/main/org/jboss/mq/server/JMSTopic.java
  
  Index: JMSTopic.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSTopic.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- JMSTopic.java     2001/08/11 20:59:15     1.1
  +++ JMSTopic.java     2001/08/15 02:43:33     1.2
  @@ -29,13 +29,13 @@
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    *   @author David Maplesden ([EMAIL PROTECTED])
    *
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public class JMSTopic extends JMSDestination {
   
        //Hashmap of ExclusiveQueues
        HashMap durQueues = new HashMap();
  -  HashMap tempQueues = new HashMap();
  +     HashMap tempQueues = new HashMap();
   
   
        // Constructor ---------------------------------------------------
  @@ -44,87 +44,100 @@
                super( dest, temporary, server );
        }
   
  -
  +     public void clientConsumerStopped(ClientConsumer clientConsumer){
  +             synchronized (durQueues) {
  +                     Iterator iter = durQueues.values().iterator();
  +                     while( iter.hasNext() ) {
  +                             
((BasicQueue)iter.next()).clientConsumerStopped(clientConsumer);
  +                     }
  +             }
  +             synchronized (tempQueues) {
  +                     Iterator iter = tempQueues.values().iterator();
  +                     while( iter.hasNext() ) {
  +                             
((BasicQueue)iter.next()).clientConsumerStopped(clientConsumer);
  +                     }
  +             }
  +     }
   
        public void addSubscriber(Subscription sub) throws JMSException {
                SpyTopic topic = (SpyTopic)sub.destination;
                DurableSubcriptionID id = topic.getDurableSubscriptionID();
                if( id==null ) {
  -       BasicQueue q = new BasicQueue(server);
  -             synchronized (tempQueues) {
  +             BasicQueue q = new BasicQueue(server);
  +                     synchronized (tempQueues) {
                tempQueues.put(sub,q);
  -             }
  +                     }
                }else{
  -       PersistentQueue q = null;
  -       synchronized(durQueues){
  +             PersistentQueue q = null;
  +             synchronized(durQueues){
                q = (PersistentQueue) durQueues.get(id);
  -       }
  -       if(q == null || //Brand new durable subscriber
  -               !q.destination.equals(topic)) //subscription changed to new topic
  +             }
  +             if(q == null || //Brand new durable subscriber
  +                     !q.destination.equals(topic)) //subscription changed to new 
topic
                        server.getStateManager().setDurableSubscription(server, id, 
topic);
                }
        }
   
  -  public void removeSubscriber(Subscription sub) throws JMSException
  -  {
  +     public void removeSubscriber(Subscription sub) throws JMSException
  +     {
                BasicQueue queue = null;
  -     SpyTopic topic = (SpyTopic)sub.destination;
  +             SpyTopic topic = (SpyTopic)sub.destination;
                DurableSubcriptionID id = topic.getDurableSubscriptionID();
                if( id==null ) {
  -             synchronized (tempQueues) {
  -             queue = (BasicQueue)tempQueues.remove(sub);
  -             }
  +                     synchronized (tempQueues) {
  +                             queue = (BasicQueue)tempQueues.remove(sub);
  +                     }
                }else{
  -               synchronized (durQueues){
  -                 queue = (BasicQueue)durQueues.get(id); //note DON'T remove
  -       }
  +                     synchronized (durQueues){
  +                             queue = (BasicQueue)durQueues.get(id); //note DON'T 
remove
  +                     }
  +             }
  +             queue.removeReceiver(sub);
        }
  -     queue.removeReceiver(sub);
  -  }    
  -
  -  public void addReceiver(Subscription sub){
  -     getQueue(sub).addReceiver(sub);
  -  }    
   
  -  public void removeReceiver(Subscription sub){
  -     getQueue(sub).removeReceiver(sub);
  -  }    
  +     public void addReceiver(Subscription sub){
  +             getQueue(sub).addReceiver(sub);
  +     }
   
  -  public void restoreMessage(SpyMessage message){
  -     synchronized(this){
  -       messageIdCounter = Math.max(messageIdCounter,message.messageId+1);
  +     public void removeReceiver(Subscription sub){
  +             getQueue(sub).removeReceiver(sub);
        }
  -     if(message.durableSubscriberID == null){
  -       cat.debug("Trying to restore message with null durableSubscriberID");
  -     }else{
  -       
((BasicQueue)durQueues.get(message.durableSubscriberID)).restoreMessage(message);
  +
  +     public void restoreMessage(SpyMessage message){
  +             synchronized(this){
  +                     messageIdCounter = 
Math.max(messageIdCounter,message.messageId+1);
  +             }
  +             if(message.durableSubscriberID == null){
  +                     cat.debug("Trying to restore message with null 
durableSubscriberID");
  +             }else{
  +                     
((BasicQueue)durQueues.get(message.durableSubscriberID)).restoreMessage(message);
  +             }
        }
  -  }    
   
   
   
  -  //called by state manager when a durable sub is created
  +     //called by state manager when a durable sub is created
        public void createDurableSubscription(DurableSubcriptionID id) throws 
JMSException
  -  {
  +     {
                if( temporaryDestination != null )
                        throw new JMSException("Not a valid operation on a temporary 
topic");
   
                SpyTopic dstopic = new SpyTopic( (SpyTopic)destination, id);
   
  -     BasicQueue queue = new PersistentQueue(server,dstopic);
  +             BasicQueue queue = new PersistentQueue(server,dstopic);
                synchronized (durQueues) {
                        durQueues.put(id, queue);
                }
        }
   
  -  //called by state manager when a durable sub is deleted
  +     //called by state manager when a durable sub is deleted
        public void destoryDurableSubscription(DurableSubcriptionID id) throws 
JMSException
        {
                BasicQueue queue;
  -     synchronized (durQueues) {
  +             synchronized (durQueues) {
                        queue = (BasicQueue)durQueues.remove(id);
                }
  -     queue.destroy();
  +             queue.destroy();
        }
   
        // Package protected ---------------------------------------------
  @@ -135,25 +148,25 @@
        }
   
        public SpyMessage receive(Subscription sub,boolean wait) throws 
javax.jms.JMSException {
  -     return getQueue(sub).receive(sub,wait);
  +             return getQueue(sub).receive(sub,wait);
        }
   
   
  -  private BasicQueue getQueue(Subscription sub){
  -             SpyTopic topic = (SpyTopic)sub.destination;
  +     private BasicQueue getQueue(Subscription sub){
  +             SpyTopic topic = (SpyTopic)sub.destination;
                DurableSubcriptionID id = topic.getDurableSubscriptionID();
                if( id!=null ) {
  -       return getDurableSubscription(id);
  +                     return getDurableSubscription(id);
                }else{
  -       synchronized(tempQueues){
  -             return (BasicQueue)tempQueues.get(sub);
  -       }
  -             }
  -  }    
  -
  -  public void acknowledge(org.jboss.mq.AcknowledgementRequest req, Subscription 
sub, org.jboss.mq.pm.Tx txId) throws JMSException{
  -     getQueue(sub).acknowledge(req,txId);
  -  }      
  +                     synchronized(tempQueues){
  +                             return (BasicQueue)tempQueues.get(sub);
  +                     }
  +             }
  +     }
  +
  +     public void acknowledge(org.jboss.mq.AcknowledgementRequest req, Subscription 
sub, org.jboss.mq.pm.Tx txId) throws JMSException{
  +             getQueue(sub).acknowledge(req,txId);
  +     }
   
        public void addMessage(SpyMessage message, org.jboss.mq.pm.Tx txId) throws 
JMSException
        {
  @@ -164,30 +177,29 @@
   //           }
   
                //Number the message so that we can preserve order of delivery.
  -     long messageId = 0;
  +             long messageId = 0;
                synchronized(this) {
  -               messageId = messageIdCounter++;
  -       synchronized (durQueues) {
  -             Iterator iter = durQueues.keySet().iterator();
  -             while( iter.hasNext() ) {
  -               DurableSubcriptionID id = (DurableSubcriptionID) iter.next();
  -               PersistentQueue q = (PersistentQueue)durQueues.get(id);
  -               SpyMessage clone = message.myClone();
  -               clone.durableSubscriberID = id;
  -               clone.messageId = messageId;
  -               q.addMessage(clone, txId);
  -             }
  -       }
  -       synchronized (tempQueues) {
  -             Iterator iter = tempQueues.values().iterator();
  -             while( iter.hasNext() ) {
  -               BasicQueue q = (BasicQueue)iter.next();
  -               SpyMessage clone = message.myClone();
  -               clone.messageId = messageId;
  -               q.addMessage(clone, txId);
  +                     messageId = messageIdCounter++;
  +                     synchronized (durQueues) {
  +                             Iterator iter = durQueues.keySet().iterator();
  +                             while( iter.hasNext() ) {
  +                                     DurableSubcriptionID id = 
(DurableSubcriptionID) iter.next();
  +                                     PersistentQueue q = 
(PersistentQueue)durQueues.get(id);
  +                                     SpyMessage clone = message.myClone();
  +                                     clone.durableSubscriberID = id;
  +                                     clone.messageId = messageId;
  +                                     q.addMessage(clone, txId);
  +                             }
  +                     }
  +                     synchronized (tempQueues) {
  +                             Iterator iter = tempQueues.values().iterator();
  +                             while( iter.hasNext() ) {
  +                                     BasicQueue q = (BasicQueue)iter.next();
  +                                     SpyMessage clone = message.myClone();
  +                                     clone.messageId = messageId;
  +                                     q.addMessage(clone, txId);
  +                             }
  +                     }
                }
  -       }
  -             }
  -
  -  }      
  +     }
   }
  
  
  
  1.2       +35 -30    jbossmq/src/main/org/jboss/mq/server/JMSServer.java
  
  Index: JMSServer.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSServer.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- JMSServer.java    2001/08/11 20:59:15     1.1
  +++ JMSServer.java    2001/08/15 02:43:33     1.2
  @@ -41,16 +41,16 @@
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    *   @author David Maplesden ([EMAIL PROTECTED])
    *
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public class JMSServer
   {
  -  //Implement Singleton design pattern for JVMIL to work correctly
  -  //It would be nice if there was another way of doing this.
  -  protected static JMSServer theInstance = new JMSServer();
  -  public static JMSServer getInstance(){
  +     //Implement Singleton design pattern for JVMIL to work correctly
  +     //It would be nice if there was another way of doing this.
  +     protected static JMSServer theInstance = new JMSServer();
  +     public static JMSServer getInstance(){
        return theInstance;
  -  }    
  +     }
   
        /////////////////////////////////////////////////////////////////////
        // Attributes
  @@ -70,7 +70,7 @@
        //The persistence manager
        private PersistenceManager persistenceManager;
   
  -  private Object stateLock = new Object();
  +     private Object stateLock = new Object();
   
        /**
         * <code>true</code> when the server is running.  <code>false</code> when the
  @@ -89,7 +89,7 @@
        HashMap clientConsumers = new HashMap();
   
        static org.apache.log4j.Category cat = 
org.apache.log4j.Category.getInstance(JMSServer.class);
  -  //Thread group for server side threads.
  +     //Thread group for server side threads.
        public ThreadGroup threadGroup = new ThreadGroup("JBossMQ Server Threads");
   
        /////////////////////////////////////////////////////////////////////
  @@ -113,29 +113,29 @@
         */
        public boolean isStopped() {
        synchronized(stateLock){
  -             return this.stopped;
  +                     return this.stopped;
        }
        }
   
  -  public void startServer() {
  +     public void startServer() {
        synchronized(stateLock){
  -       this.stopped = false;
  +             this.stopped = false;
  +     }
        }
  -  }    
   
        public void stopServer() {
        synchronized(stateLock){
  -       this.stopped = true;
  -       //Any work that needs doing should be done here
  +             this.stopped = true;
  +             //Any work that needs doing should be done here
   
  -       //At the moment there is nothing to do due to the fact that the individual
  -       //parts of the JBossMQ (pm, ils etc) each have their own mbean service
  -       //which starts and stops them separately
  -
  -       //We could wait in here for the client consumers to finish delivering any
  -       //messages they have, but it is not neccessary as the client acks will be
  -       //almost certainly be lost anyway.
  -               this.alive = false;
  +             //At the moment there is nothing to do due to the fact that the 
individual
  +             //parts of the JBossMQ (pm, ils etc) each have their own mbean service
  +             //which starts and stops them separately
  +
  +             //We could wait in here for the client consumers to finish delivering 
any
  +             //messages they have, but it is not neccessary as the client acks will 
be
  +             //almost certainly be lost anyway.
  +                     this.alive = false;
        }
        }
   
  @@ -404,6 +404,11 @@
        public void setEnabled(ConnectionToken dc, boolean enabled) throws 
JMSException {
                ClientConsumer ClientConsumer = getClientConsumer(dc);
                ClientConsumer.setEnabled(enabled);
  +             if(!enabled){
  +                     for(Iterator it = 
destinations.values().iterator();it.hasNext();){
  +                             
((JMSDestination)it.next()).clientConsumerStopped(ClientConsumer);
  +                     }
  +             }
        }
   
        public synchronized Queue createQueue(ConnectionToken dc, String name) throws 
JMSException
  @@ -436,9 +441,9 @@
   
        }
   
  -  public StateManager getStateManager() {
  +     public StateManager getStateManager() {
        return stateManager;
  -  }    
  +     }
   
        public String checkUser(String userName, String password) throws JMSException {
                return stateManager.checkUser(userName, password);
  @@ -462,9 +467,9 @@
   
   
   
  -  public void setStateManager(StateManager newStateManager) {
  +     public void setStateManager(StateManager newStateManager) {
        stateManager = newStateManager;
  -  }    
  +     }
   
        public static final String JBOSS_VESION = "JBossMQ ver. 0.9b";
   
  @@ -480,11 +485,11 @@
                return JBOSS_VESION;
        }
   
  -  public org.jboss.mq.pm.PersistenceManager getPersistenceManager() {
  +     public org.jboss.mq.pm.PersistenceManager getPersistenceManager() {
        return persistenceManager;
  -  }      
  +     }
   
  -  public void setPersistenceManager(org.jboss.mq.pm.PersistenceManager 
newPersistenceManager) {
  +     public void setPersistenceManager(org.jboss.mq.pm.PersistenceManager 
newPersistenceManager) {
        persistenceManager = newPersistenceManager;
  -  }      
  +     }
   }
  
  
  
  1.2       +32 -28    jbossmq/src/main/org/jboss/mq/server/JMSQueue.java
  
  Index: JMSQueue.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSQueue.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- JMSQueue.java     2001/08/11 20:59:15     1.1
  +++ JMSQueue.java     2001/08/15 02:43:33     1.2
  @@ -29,11 +29,11 @@
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    *   @author David Maplesden ([EMAIL PROTECTED])
    *
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public class JMSQueue extends JMSDestination {
   
  -  BasicQueue queue;
  +     BasicQueue queue;
   
        // Constructor ---------------------------------------------------
        JMSQueue(SpyDestination dest,ClientConsumer temporary,JMSServer server) throws 
JMSException
  @@ -42,38 +42,42 @@
   
                // If this is a non-temp queue, then we should persist data
                if( temporaryDestination == null ) {
  -       queue = new PersistentQueue(server,dest);
  +             queue = new PersistentQueue(server,dest);
                }else{
  -             queue = new BasicQueue(server);
  +                     queue = new BasicQueue(server);
                }
        }
   
  -  public void addSubscriber(Subscription sub){
  -  }    
  +     public void clientConsumerStopped(ClientConsumer clientConsumer){
  +             queue.clientConsumerStopped(clientConsumer);
  +     }
   
  -  public void removeSubscriber(Subscription sub){
  -     removeReceiver(sub);
  -  }    
  -
  -  public void addReceiver(Subscription sub){
  -     queue.addReceiver(sub);
  -  }    
  -
  -  public void removeReceiver(Subscription sub){
  -     queue.removeReceiver(sub);
  -  }    
  -
  -  public void restoreMessage(SpyMessage message){
  -     synchronized(this){
  -       messageIdCounter = Math.max(messageIdCounter,message.messageId+1);
  +     public void addSubscriber(Subscription sub){
        }
  -     queue.restoreMessage(message);
  -  }    
   
  +     public void removeSubscriber(Subscription sub){
  +             removeReceiver(sub);
  +     }
   
  +     public void addReceiver(Subscription sub){
  +             queue.addReceiver(sub);
  +     }
   
  +     public void removeReceiver(Subscription sub){
  +             queue.removeReceiver(sub);
  +     }
   
  +     public void restoreMessage(SpyMessage message){
  +             synchronized(this){
  +                     messageIdCounter = 
Math.max(messageIdCounter,message.messageId+1);
  +             }
  +             queue.restoreMessage(message);
  +     }
   
  +
  +
  +
  +
        public SpyMessage[] browse(String selector) throws JMSException {
                return queue.browse( selector );
        }
  @@ -84,9 +88,9 @@
   
   
   
  -  public void acknowledge(org.jboss.mq.AcknowledgementRequest req, Subscription 
sub, org.jboss.mq.pm.Tx txId) throws JMSException{
  -     queue.acknowledge(req,txId);
  -  }      
  +     public void acknowledge(org.jboss.mq.AcknowledgementRequest req, Subscription 
sub, org.jboss.mq.pm.Tx txId) throws JMSException{
  +             queue.acknowledge(req,txId);
  +     }
   
        public void addMessage(SpyMessage mes, org.jboss.mq.pm.Tx txId) throws 
JMSException
        {
  @@ -97,8 +101,8 @@
   
                //Number the message so that we can preserve order of delivery.
                synchronized(this) {
  -               mes.messageId = messageIdCounter++;
  -             queue.addMessage(mes, txId);
  +                     mes.messageId = messageIdCounter++;
  +                     queue.addMessage(mes, txId);
                }
        }
   
  
  
  
  1.2       +7 -7      jbossmq/src/main/org/jboss/mq/server/JMSDestination.java
  
  Index: JMSDestination.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSDestination.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- JMSDestination.java       2001/08/11 20:59:15     1.1
  +++ JMSDestination.java       2001/08/15 02:43:33     1.2
  @@ -28,7 +28,7 @@
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    *   @author David Maplesden ([EMAIL PROTECTED])
    *
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   abstract public class JMSDestination {
   
  @@ -61,16 +61,16 @@
   
        public abstract SpyMessage receive(Subscription sub,boolean wait) throws 
JMSException;
   
  -  public abstract void addReceiver(Subscription sub);    
  +     public abstract void addReceiver(Subscription sub);
   
  -  public abstract void removeReceiver(Subscription sub);    
  +     public abstract void removeReceiver(Subscription sub);
   
  -  public abstract void restoreMessage(SpyMessage message);    
  +     public abstract void restoreMessage(SpyMessage message);
   
  +     public abstract void clientConsumerStopped(ClientConsumer clientConsumer);
   
  -
   /**
  - * 
  + *
    * @param req org.jboss.mq.AcknowledgementRequest
    * @param sub org.jboss.mq.Subscription
    * @param txId org.jboss.mq.pm.Tx
  @@ -79,7 +79,7 @@
   public abstract void acknowledge(org.jboss.mq.AcknowledgementRequest req, 
org.jboss.mq.Subscription sub, org.jboss.mq.pm.Tx txId) throws javax.jms.JMSException;
   
   /**
  - * 
  + *
    * @param mes org.jboss.mq.SpyMessage
    * @param txId org.jboss.mq.pm.Tx
    * @exception javax.jms.JMSException The exception description.
  
  
  
  1.2       +7 -3      jbossmq/src/main/org/jboss/mq/server/ClientConsumer.java
  
  Index: ClientConsumer.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/ClientConsumer.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- ClientConsumer.java       2001/08/11 20:59:15     1.1
  +++ ClientConsumer.java       2001/08/15 02:43:33     1.2
  @@ -28,7 +28,7 @@
    *
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    *
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public class ClientConsumer implements Runnable {
   
  @@ -123,13 +123,17 @@
                        return queue.receive(req,(wait != -1));
                }
                else if(wait != -1) {
  -                     blockedSubscriptions.add(req);
  +                     addBlockedSubscription(req);
                }
   
                return null;
        }
   
  -
  +     void addBlockedSubscription(Subscription sub){
  +             synchronized(blockedSubscriptions){
  +                     blockedSubscriptions.add(sub);
  +             }
  +     }
   
   
        public void removeSubscription(int subscriptionId) throws JMSException
  
  
  
  1.2       +81 -71    jbossmq/src/main/org/jboss/mq/server/BasicQueue.java
  
  Index: BasicQueue.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/BasicQueue.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- BasicQueue.java   2001/08/11 20:59:15     1.1
  +++ BasicQueue.java   2001/08/15 02:43:33     1.2
  @@ -37,7 +37,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author David Maplesden ([EMAIL PROTECTED])
    *
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   //abstract public class BasicQueue implements Runnable {
   public class BasicQueue {
  @@ -59,34 +59,45 @@
                this.server=server;
        }
   
  -  private void internalAddMessage(SpyMessage message)
  -  {
  -     //try waiting receivers
  -     synchronized(receivers){
  -       if(!receivers.isEmpty()){
  -             for(Iterator it = receivers.iterator();it.hasNext();){
  -               Subscription sub = (Subscription)it.next();
  -               try{
  -                     if(sub.accepts(message)){
  -                       //queue message for sending to this sub
  -                       queueMessageForSending(sub,message);
  -                       it.remove();
  -                       return;
  -                     }
  -               }catch(JMSException ignore){
  -                     cat.debug("Caught unusual exception in 
internalAddMessage.",ignore);
  -               }
  +     private void internalAddMessage(SpyMessage message)
  +     {
  +             //try waiting receivers
  +             synchronized(receivers){
  +                     if(!receivers.isEmpty()){
  +                             for(Iterator it = receivers.iterator();it.hasNext();){
  +                                     Subscription sub = (Subscription)it.next();
  +                                     try{
  +                                             if(sub.accepts(message)){
  +                                                     //queue message for sending to 
this sub
  +                                                     
queueMessageForSending(sub,message);
  +                                                     it.remove();
  +                                                     return;
  +                                             }
  +                                     }catch(JMSException ignore){
  +                                             cat.debug("Caught unusual exception in 
internalAddMessage.",ignore);
  +                                     }
  +                             }
  +                     }
                }
  -       }
  -     }
   
                //else add to message list
                synchronized (messages) {
                        messages.add(message);
                }
  -  }    
  -
  +     }
   
  +     public void clientConsumerStopped(ClientConsumer clientConsumer){
  +             //remove all waiting subs for this clientConsumer and send to its 
blocked list.
  +             synchronized (receivers) {
  +                     for(Iterator it = receivers.iterator();it.hasNext();){
  +                             Subscription sub = (Subscription) it.next();
  +                             if(sub.clientConsumer.equals(clientConsumer)){
  +                                     clientConsumer.addBlockedSubscription(sub);
  +                                     it.remove();
  +                             }
  +                     }
  +             }
  +     }
   
        //Used to put a message that was added previously to the queue, back in the 
queue
        public void restoreMessage(SpyMessage mes)
  @@ -125,7 +136,7 @@
                }
        }
   
  -  protected void setupMessageAcknowledgement(Subscription sub,SpyMessage message){
  +     protected void setupMessageAcknowledgement(Subscription sub,SpyMessage 
message){
        AcknowledgementRequest ack = new AcknowledgementRequest();
        ack.destination = message.getJMSDestination();
        ack.messageID = message.getJMSMessageID();
  @@ -133,61 +144,61 @@
        ack.isAck = false;
   
        synchronized (unacknowledgedMessages) {
  -       unacknowledgedMessages.put(ack, message);
  +             unacknowledgedMessages.put(ack, message);
  +     }
        }
  -  }    
   
  -  protected void queueMessageForSending(Subscription sub, SpyMessage message) 
throws JMSException{
  +     protected void queueMessageForSending(Subscription sub, SpyMessage message) 
throws JMSException{
        setupMessageAcknowledgement(sub,message);
        ReceiveRequest r = new ReceiveRequest();
        r.message = message;
        r.subscriptionId = new Integer(sub.subscriptionId);
   
        ((ClientConsumer)sub.clientConsumer).queueMessageForSending(r);
  -  }    
  +     }
   
  -  public void addReceiver(Subscription sub){
  +     public void addReceiver(Subscription sub){
        synchronized(messages){
  -       if(messages.size() != 0){
  +             if(messages.size() != 0){
                for(Iterator it = messages.iterator();it.hasNext();){
  -               SpyMessage message = (SpyMessage)it.next();
  -               try{
  +                     SpyMessage message = (SpyMessage)it.next();
  +                     try{
                        if(sub.accepts(message)){
  -                       //queue message for sending to this sub
  -                       queueMessageForSending(sub,message);
  -                       it.remove();
  -                       return;
  +                             //queue message for sending to this sub
  +                             queueMessageForSending(sub,message);
  +                             it.remove();
  +                             return;
                        }
  -               }catch(JMSException ignore){
  +                     }catch(JMSException ignore){
                        cat.debug("Caught unusual exception in 
addToReceivers.",ignore);
  -               }
  +                     }
                }
  -       }
  +             }
        }
        addToReceivers(sub);
  -  }    
  +     }
   
  -  protected void addToReceivers(Subscription sub){
  -     synchronized(receivers){
  -       receivers.add(sub);
  +     protected void addToReceivers(Subscription sub){
  +             synchronized(receivers){
  +                     receivers.add(sub);
  +             }
        }
  -  }    
   
  -  public void removeReceiver(Subscription sub){
  -     synchronized(receivers){
  -       receivers.remove(sub);
  +     public void removeReceiver(Subscription sub){
  +             synchronized(receivers){
  +                     receivers.remove(sub);
  +             }
        }
  -  }    
   
        public SpyMessage receive(Subscription sub, boolean wait) throws  JMSException
        {
  -     SpyMessage message = null;
  +             SpyMessage message = null;
                Selector selector = sub.getSelector();
                if( selector == null ) {
                        synchronized (messages) {
                                if (messages.size()!=0){
  -               message = (SpyMessage)messages.first();
  -               messages.remove(message);
  +                                     message = (SpyMessage)messages.first();
  +                                     messages.remove(message);
                                }
                        }
                } else {
  @@ -196,36 +207,35 @@
                                while( i.hasNext() ) {
                                        SpyMessage m = (SpyMessage)i.next();
                                        if( selector.test(m) ) {
  -                     message = m;
  -                     messages.remove(message);
  +                                             message = m;
  +                                             messages.remove(message);
                                                break;
                                        }
                                }
                        }
                }
   
  -     if(message == null){
  -       if(wait)
  -             addToReceivers(sub);
  -     }else{
  -       setupMessageAcknowledgement(sub,message);
  -     }
  -     return message;
  +             if(message == null){
  +                     if(wait)
  +                             addToReceivers(sub);
  +             }else{
  +                     setupMessageAcknowledgement(sub,message);
  +             }
  +             return message;
        }
  -
  -  public void destroy() throws JMSException{
  -     synchronized (unacknowledgedMessages) {
  -       Iterator i = ((HashMap)unacknowledgedMessages.clone()).keySet().iterator();
  -       while( i.hasNext() ) {
   
  -             AcknowledgementRequest item = (AcknowledgementRequest)i.next();
  -             try {
  -               acknowledge(item, null);
  -             } catch ( JMSException ignore ) {
  +     public void destroy() throws JMSException{
  +             synchronized (unacknowledgedMessages) {
  +                     Iterator i = 
((HashMap)unacknowledgedMessages.clone()).keySet().iterator();
  +                     while( i.hasNext() ) {
  +                             AcknowledgementRequest item = 
(AcknowledgementRequest)i.next();
  +                             try {
  +                                     acknowledge(item, null);
  +                             } catch ( JMSException ignore ) {
  +                             }
  +                     }
                }
  -       }
        }
  -  }    
   
        public void acknowledge(AcknowledgementRequest item, org.jboss.mq.pm.Tx txId) 
throws javax.jms.JMSException {
   
  @@ -279,7 +289,7 @@
   
                        public void run() {
                                //restore a message to the message list...
  -             internalAddMessage(message);
  +                             internalAddMessage(message);
                        }
                }
   
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to