User: hiram   
  Date: 00/11/19 11:59:59

  Modified:    src/java/org/spydermq JMSServer.java JMSServerQueue.java
                        SpyConnection.java SpyDistributedConnection.java
                        SpyMessage.java SpyMessageConsumer.java
                        SpyQueueReceiver.java SpyQueueSession.java
                        SpySession.java SpyTopicSession.java
                        SpyTopicSubscriber.java
  Added:       src/java/org/spydermq JMSServerQueueReceiver.java
                        SpyAcknowledgementItem.java
  Removed:     src/java/org/spydermq ConnectionQueue.java SessionQueue.java
  Log:
  Commiting several changes:
   - Removed ConnectionQueue and SessionQueue.  All consumer managment is done at the 
SpyConnection now.
   - Unacknowledged messages are maintained on the server side now. 
(JMSServerQueueReceiver)
   - Acknowlegment messages are sent to the server from the client.
   - Optimized the OIL and UIL transports by caching the DistributedConnection on the 
JMSServer when the ConnectionReciver is setup.
   - Cleaned up the OIL by only using the Object(Output/Input) streams instead of both 
Object(Output/Input) and Buffered(Output/Input) streams.
   - A QueueReceiver now does a request for a single message on a receive() method 
instead turning on/off listen to get a message.
   - For the OIL and UIL, a connection failure/termination is now handled gracefully  
(if connectionCLosing() was called, no errors shown, if it was not called and the 
connection failed, we call it).
  
  Revision  Changes    Path
  1.9       +103 -87   spyderMQ/src/java/org/spydermq/JMSServer.java
  
  Index: JMSServer.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/JMSServer.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- JMSServer.java    2000/08/25 02:30:23     1.8
  +++ JMSServer.java    2000/11/19 19:59:56     1.9
  @@ -22,14 +22,14 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.8 $
  + *   @version $Revision: 1.9 $
    */
   public class JMSServer 
                implements Runnable, JMSServerMBean
   {
        
        // Constants -----------------------------------------------------
  -    
  +     
        //number of threads in the pool (TO DO: this value should be dynamic)
        final int NB_THREADS=1;
        public static final String OBJECT_NAME = "JMS:service=JMSServer";
  @@ -49,18 +49,18 @@
        //The security manager
        SecurityManager securityManager;
   
  -    /**
  -     * <code>true</code> when the server is running.  <code>false</code> when the 
  -     * server should stop running.
  -     */
  -    private boolean alive = true;
  -
  -    /**
  -     * Because there can be a delay between killing the JMS service and the 
  -     * service actually dying, this field is used to tell external classes
  -     * that that server has actually stopped.
  -     */
  -    private boolean stopped = true;
  +     /**
  +      * <code>true</code> when the server is running.  <code>false</code> when the 
  +      * server should stop running.
  +      */
  +     private boolean alive = true;
  +
  +     /**
  +      * Because there can be a delay between killing the JMS service and the 
  +      * service actually dying, this field is used to tell external classes
  +      * that that server has actually stopped.
  +      */
  +     private boolean stopped = true;
   
        // Constructor ---------------------------------------------------
      
  @@ -82,19 +82,19 @@
                lastTemporaryTopic=1;
                this.securityManager=securityManager;
                
  -    }
  +     }
   
  -    /**
  -     * Returns <code>false</code> if the JMS server is currently
  -     * running and handling requests, <code>true</code> otherwise.
  -     *
  -     * @return <code>false</code> if the JMS server is currently
  -     *         running and handling requests, <code>true</code> 
  -     *         otherwise.
  -     */
  -    public boolean isStopped() {
  -        return this.stopped;
  -    }
  +     /**
  +      * Returns <code>false</code> if the JMS server is currently
  +      * running and handling requests, <code>true</code> otherwise.
  +      *
  +      * @return <code>false</code> if the JMS server is currently
  +      *         running and handling requests, <code>true</code> 
  +      *         otherwise.
  +      */
  +     public boolean isStopped() {
  +             return this.stopped;
  +     }
   
        // Public --------------------------------------------------------
   
  @@ -102,59 +102,58 @@
        //We should let threads cycle through the JMSServerQueue list, and 
synchronized on the queue they are working on.
        
        public void run() {
  -            while (alive) {
  -                JMSServerQueue queue = null;                 
  +                     while (alive) {
  +                             JMSServerQueue queue = null;                    
                
  -                this.stopped = false;
  +                             this.stopped = false;
   
  -                //Wait (and sleep) until it can find something to do
  -                synchronized (taskQueue) {
  -                    while (queue == null && alive) {                                
 
  -                        
  -                        // size() is O(1) in LinkedList... 
  -                        int size=taskQueue.size(); 
  -                        if (size!=0) { 
  -                            
  -                            //<DEBUG>
  -                            queue = (JMSServerQueue)taskQueue.removeFirst();
  -                            //queue=(JMSServerQueue)taskQueue.getFirst();
  -                            //</DEBUG>
  -                            
  -                            //One other thread can start working on the task 
queue...
  -                            if (size > 1) {
  -                                taskQueue.notify();
  -                            }
  -                        } else {     
  -                            try {
  -                                Log.log("I'm going to bed...");
  -                                taskQueue.wait(5000);
  -                                Log.log("I wake up");
  -                            } catch (InterruptedException e) {
  -                            }
  -                        }
  -                        
  -                    }
  -                }
  -                
  -                if (alive) {
  -                    //Ask the queue to do its job
  -                    try {
  -                        queue.doMyJob();
  -                    } catch (JMSException e) {
  -                        Log.error(e);
  -                    }
  -                }
  -            }
  -            Log.log("JMS service stopped.");
  -            this.stopped = true;
  -     }
  -
  -    public void stopServer() {
  -        this.alive = false;
  -    }
  +                             //Wait (and sleep) until it can find something to do
  +                             synchronized (taskQueue) {
  +                                     while (queue == null && alive) {               
                         
  +                                             
  +                                             // size() is O(1) in LinkedList... 
  +                                             int size=taskQueue.size(); 
  +                                             if (size!=0) { 
  +                                                     
  +                                                     //<DEBUG>
  +                                                     queue = 
(JMSServerQueue)taskQueue.removeFirst();
  +                                                     
//queue=(JMSServerQueue)taskQueue.getFirst();
  +                                                     //</DEBUG>
  +                                                     
  +                                                     //One other thread can start 
working on the task queue...
  +                                                     if (size > 1) {
  +                                                             taskQueue.notify();
  +                                                     }
  +                                             } else {        
  +                                                     try {
  +                                                             //Log.log("I'm going 
to bed...");
  +                                                             taskQueue.wait(5000);
  +                                                             //Log.log("I wake up");
  +                                                     } catch (InterruptedException 
e) {
  +                                                     }
  +                                             }
  +                                             
  +                                     }
  +                             }
  +                             
  +                             if (alive) {
  +                                     //Ask the queue to do its job
  +                                     try {
  +                                             queue.doMyJob();
  +                                     } catch (JMSException e) {
  +                                             Log.error(e);
  +                                     }
  +                             }
  +                     }
  +                     Log.log("JMS service stopped.");
  +                     this.stopped = true;
  +     }
   
  -     // Administration calls
  -     
  +     public void stopServer() {
  +             this.alive = false;
  +     }
  +
  +     // Administration calls 
        public SpyTopic newTopic(String name) throws JMSException
        {
                Log.notice("[JMSServer] new topic : "+name);
  @@ -216,7 +215,7 @@
        }
        
        //A connection has send a new message
  -    public void newMessage(SpyMessage val[],String id) throws JMSException 
  +     public void newMessage(SpyMessage val[],String id) throws JMSException 
        {//Warning, a part is in the OIL file
                if (val.length!=1) Log.notice("INCOMING: "+val.length+" messages from 
"+id);
                
  @@ -230,7 +229,7 @@
                        //Add the message to the queue
                        queue.addMessage(val[i]);
                }
  -    }
  +     }
        
        //A connection object wants to subscribe to a Destination
        public void subscribe(Destination dest,SpyDistributedConnection dc) throws 
JMSException
  @@ -300,6 +299,8 @@
        //A connection is closing [error or notification]
        public synchronized void connectionClosing(SpyDistributedConnection 
dc,JMSServerQueue noCheck)
        {
  +             Log.log("connectionClosing(dc="+dc+",noCheck="+noCheck+")");
  +             
                if (dc==null) return;
                
                //unregister its clientID
  @@ -350,15 +351,7 @@
                securityManager.addClientID(ID);
        }
        
  -     public SpyMessage queueReceiveNoWait(Queue queue) throws JMSException
  -     {
  -             Log.log("JMSserver: queueReceiveNoWait(queue="+queue+")");
  -             
  -             JMSServerQueue serverQueue=(JMSServerQueue)messageQueue.get(queue);
  -             if (serverQueue==null) throw new JMSException("This destination does 
not exist !");
  -             
  -             return serverQueue.queueReceiveNoWait();
  -     }
  +
        
        public void connectionListening(boolean mode,Destination 
dest,SpyDistributedConnection dc) throws JMSException
        {
  @@ -373,4 +366,27 @@
                Log.error("JMSServer.finalize()");
        }
   
  +     //Sent by a client to Ack or Nack a message.
  +     public void acknowledge(SpyAcknowledgementItem[] items, boolean isAck, 
SpyDistributedConnection dc) throws JMSException
  +     {
  +
  +             for( int i=0; i<items.length; i++ ) {
  +                     
  +                     JMSServerQueue 
serverQueue=(JMSServerQueue)messageQueue.get(items[i].jmsDestination);
  +                     if (serverQueue==null) throw new JMSException("Destination 
does not exist: "+items[i].jmsDestination);
  +             
  +                     serverQueue.acknowledge(dc, items[i].jmsMessageID, isAck);
  +             }
  +     }       
  +     
  +     //Used by QueueReceivers for receive(), receive(long wait), and receiveNoWait()
  +     public SpyMessage queueReceive(Queue queue, long wait,SpyDistributedConnection 
dc) throws JMSException
  +     {
  +             Log.log("JMSserver: queueReceive(queue="+queue+",wait="+wait+")");
  +             JMSServerQueue serverQueue=(JMSServerQueue)messageQueue.get(queue);
  +             if (serverQueue==null) throw new JMSException("This destination does 
not exist !");
  +             
  +             return serverQueue.queueReceive(wait,dc);
  +             
  +     }
   }
  
  
  
  1.17      +231 -170  spyderMQ/src/java/org/spydermq/JMSServerQueue.java
  
  Index: JMSServerQueue.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/JMSServerQueue.java,v
  retrieving revision 1.16
  retrieving revision 1.17
  diff -u -r1.16 -r1.17
  --- JMSServerQueue.java       2000/11/17 17:30:27     1.16
  +++ JMSServerQueue.java       2000/11/19 19:59:56     1.17
  @@ -19,7 +19,7 @@
    *      
    *@author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *@version $Revision: 1.16 $
  + *@version $Revision: 1.17 $
    */
   public class JMSServerQueue {
        // Attributes ----------------------------------------------------
  @@ -41,21 +41,19 @@
        private JMSServer server;
        //Am I a queue or a topic  
        boolean isTopic;
  -     //List of messages waiting for acknowledgment
  -     private LinkedList messagesWaitingForAck;
  +
        //Nb of listeners for this Queue
        int listeners;
        //Counter used to number incomming messages. (Used to order the messages.)
        long messageIdCounter = Long.MIN_VALUE;
   
  +     // Keeps track of the last used connection so that we can do round robin 
distribution of p2p messages.
  +     private JMSServerQueueReceiver lastUsedQueueReceiver;   
        // Should we use the round robin aproach to pick the next reciver of a p2p 
message?
        private boolean useRoundRobinMessageDistribution = true;
  -     // Keeps track of the last used connection so that we can do round robin 
distribution of p2p messages.
  -     private SpyDistributedConnection lastUsedConnection;
   
        
  -     // Constructor ---------------------------------------------------
  -        
  +     // Constructor ---------------------------------------------------         
        JMSServerQueue(SpyDestination dest,SpyDistributedConnection 
temporary,JMSServer server)
        {
                destination=dest;
  @@ -65,31 +63,38 @@
                alreadyInTaskQueue=false;
                temporaryDestination=temporary;
                this.server=server;
  -             messagesWaitingForAck=new LinkedList();
                isTopic=dest instanceof SpyTopic;
                listeners=0;
        }
  +
  +
        
  -     // Package protected ---------------------------------------------
  -         
  +     // Package protected ---------------------------------------------          
        void addSubscriber(SpyDistributedConnection dc) throws JMSException
        {
                //We want to avoid removeSubscriber, addSubscriber or sendOneMessage 
to work concurently
                synchronized (destination) {
                        if 
(temporaryDestination!=null&&!temporaryDestination.equals(dc)) throw new 
JMSException("You cannot subscriber to this temporary destination");
  -                     subscribers.put(dc.getClientID(),dc);
  +
  +                     Object qr = subscribers.get(dc.getClientID());
  +                     if( qr == null ) {
  +                             subscribers.put(dc.getClientID(),new 
JMSServerQueueReceiver(this,dc));
  +                     }
                }
        }
   
  +
  +     
        void removeSubscriber(SpyDistributedConnection dc,Iterator i)
        {
                //We want to avoid removeSubscriber, addSubscriber or sendOneMessage 
to work concurently
                synchronized (destination) {
  -                     
  -                     SpyDistributedConnection 
distributedConnection=(SpyDistributedConnection)subscribers.get(dc.getClientID());
  -                     if (distributedConnection==null) return;
  -                     if (distributedConnection.listeners) listeners--;
                        
  +                     JMSServerQueueReceiver 
qr=(JMSServerQueueReceiver)subscribers.get(dc.getClientID());
  +                     if (qr==null) return;
  +
  +                     qr.close();
  +                                             
                        if (i==null) {
                                if (subscribers.remove(dc.getClientID())==null) 
Log.notice("WARNING: Could not remove "+dc.getClientID());
                        } else {
  @@ -99,7 +104,9 @@
                }
        }
   
  -     public void addMessage(SpyMessage mes) throws JMSException
  +
  +     
  +     public void addMessage(SpyMessage mes)
        {
                //Add a message to the message list... 
                synchronized (messages) 
  @@ -120,6 +127,8 @@
                }
        }
   
  +
  +     
        //Clear the message queue
        synchronized SpyMessage[] startWork()
        {
  @@ -138,7 +147,9 @@
                        return mes;
                }
        }
  +
        
  +             
        synchronized SpyMessage startWorkQueue()
        {
                synchronized (messages) {
  @@ -152,7 +163,9 @@
                        return m;
                }
        }
  +
        
  +             
        void endWork()
        {
                //The thread has finished his work...
  @@ -168,6 +181,8 @@
                }
        }
   
  +
  +     
        void sendOneMessage(SpyMessage mes)
        {
                //we can only add/remove a subscribers once the message is sent ( 
iterator is fail-fast )
  @@ -177,18 +192,20 @@
                        Iterator i=subscribers.values().iterator();
                                
                        while (i.hasNext()) {
  -                             SpyDistributedConnection 
dc=(SpyDistributedConnection)i.next();
  +                             JMSServerQueueReceiver 
qr=(JMSServerQueueReceiver)i.next();
                                try {
  -                                     dc.cr.receive(destination,mes);
  +                                     qr.sendOneMessage(mes);
                                } catch (Exception e) {
  -                                     Log.notice("Cannot deliver this message to the 
client "+dc);                                    
  +                                     Log.notice("Cannot deliver this message to the 
client: "+qr.dc.getClientID());                                  
                                        Log.notice(e);
  -                                     handleConnectionFailure(dc,i);
  +                                     handleConnectionFailure(qr.dc,i);
                                } 
                        }       
                }
        }
   
  +
  +     
        void sendMultipleMessages(SpyMessage mes[])
        {
                synchronized (subscribers) {
  @@ -197,13 +214,13 @@
                        Iterator i=subscribers.values().iterator();
                                
                        while (i.hasNext()) {
  -                             SpyDistributedConnection 
dc=(SpyDistributedConnection)i.next();
  +                             JMSServerQueueReceiver 
qr=(JMSServerQueueReceiver)i.next();
                                try {
  -                                     dc.cr.receiveMultiple(destination,mes);
  +                                     qr.sendMultipleMessages(mes);
                                } catch (Exception e) {
  -                                     Log.error("Cannot deliver those messages to 
the client "+dc.getClientID());                                     
  +                                     Log.error("Cannot deliver those messages to 
the client "+qr.dc.getClientID());                                  
                                        Log.error(e);
  -                                     handleConnectionFailure(dc,i);
  +                                     handleConnectionFailure(qr.dc,i);
                                } 
                        }       
                }
  @@ -216,7 +233,9 @@
                Log.notice("Warning: The DistributedConnection was still registered 
for "+destination);
                removeSubscriber(dc,null);
        }
  +
        
  +             
        void notifyWorkers()
        {
                //It is useless to put many times the same destination in the task 
queue
  @@ -228,7 +247,9 @@
                        server.taskQueue.notify();
                }
        }
  +
        
  +             
        private void handleConnectionFailure(SpyDistributedConnection dc,Iterator i)
        {
                //We should try again :) This behavior should under control of a 
Failure-Plugin         
  @@ -241,79 +262,8 @@
                removeSubscriber(dc,i);
        }
                
  -     /**
  -      * Get a SpyDistributedConnection object that is listening 
  -      * to this queue.  If multiple objects are listening to the queue
  -      * this multiple calls to this method will cycle through them in a round 
  -      * robin fasion.
  -      */
  -     private SpyDistributedConnection pickNextRoundRobinConnection() {
  -
  -             // No valid next connection will exist, return null
  -             if (listeners == 0)
  -                     return null;
  -
  -             Iterator i = subscribers.values().iterator();
  -             SpyDistributedConnection firstFoundConnection = null;
  -             boolean enableSelectNext = false;
  -
  -             while (i.hasNext()) {
  -                     SpyDistributedConnection t = (SpyDistributedConnection) 
i.next();
  -
  -                     // Select the next valid connection if we are past the last 
used connection
  -                     if (t == lastUsedConnection || lastUsedConnection == null)
  -                             enableSelectNext = true;
  -
  -                     // Test to see if the connection is valid pick
  -                     if (t.listeners) {
  -                             // Store the first valid connection since the last 
used might be the last
  -                             // in the list 
  -                             if (firstFoundConnection == null)
  -                                     firstFoundConnection = t;
  -
  -                             // Are we past the last used? then we have the next 
item in the round robin  
  -                             if (enableSelectNext && t != lastUsedConnection) {
  -                                     lastUsedConnection = t;
  -                                     return t;
  -                             }
  -                     }
  -             }
   
  -             // We got here because we did not find a valid item in the list after 
the last
  -             // used item, so lest use the first valid item 
  -             if (firstFoundConnection != null) {
  -                     lastUsedConnection = firstFoundConnection;
  -                     return firstFoundConnection;
  -             } else {
  -                     Log.error("FIXME: The listeners count was invalid !");
  -                     return null;
  -             }
  -     }
   
  -     /**
  -      * Get a SpyDistributedConnection object that is listening 
  -      * to this queue.  Picks the first one it can find.
  -      */
  -     private SpyDistributedConnection pickFirstFoundConnection() {
  -
  -             // No valid next connection will exist, return null
  -             if (listeners == 0)
  -                     return null;
  -
  -             Iterator i = subscribers.values().iterator();
  -             while (i.hasNext()) {
  -                     SpyDistributedConnection t = (SpyDistributedConnection) 
i.next();
  -
  -                     // Test to see if the connection is valid pick
  -                     if (t.listeners) {
  -                             return t;
  -                     }
  -             }
  -
  -             // We got here because we did not find a valid item in the list.
  -             Log.error("FIXME: The listeners count was invalid !");
  -             return null;
  -     }
                
        void doMyJob() throws JMSException 
        {                       
  @@ -323,23 +273,23 @@
                        SpyMessage[] msgs=startWork();
                                
                        //Let the thread do its work
  -                     if (msgs.length>1) {
  +                     if (msgs.length == 1) {
  +                             
  +                             if (!msgs[0].isOutdated()) {
  +                                     sendOneMessage(msgs[0]);
  +                             }
  +                                     
  +                     } else if (msgs.length>1) {
                                //We can send multiple messages
  -                             Log.log("DISPATCH: "+msgs.length+" messages => 
"+destination);
                                sendMultipleMessages(msgs);
  -                     } else {        
  -                             //Send each message
  -                             for(int i=0;i<msgs.length;i++) {
  -                                     SpyMessage message=(SpyMessage)msgs[i];
  -                                     Log.log("DISPATCH: "+message+" => 
"+destination);
  -                                     if (!message.isOutdated()) 
sendOneMessage(message);
  -                             }                       
                        }
                                
                        //Notify that it has finished its work : another thread can 
start working on this queue
                        endWork();
                        
                } else {
  +
  +                     Log.log("Dispatching messages");
                        
                        synchronized (this) {
                                //In the Queue case, we synchronize on [this] to avoid 
changes (listening modifications)
  @@ -350,66 +300,52 @@
                                        //At first, find a receiver
                                        //NL: We could find a better receiver (load 
balancing ?)
                                        //HC: Using Round Robin should provide some 
load balancing
  -                                     
  -                                     Log.log("get a receiver");
  -                                     
  -                                     // we may have to restore the 
lastUsedConnection
  -                                     // if message on the queue is not sent. (we 
don't want to skip 
  -                                     // destination in the round robin)
  -                                     SpyDistributedConnection 
saveLastConnection=lastUsedConnection;
  -                                     SpyDistributedConnection dc;
  -                                                                             
  -                                                                             if( 
useRoundRobinMessageDistribution ) {
  -                                                                                    
 dc=pickNextRoundRobinConnection();
  -                                                                             } else 
{
  -                                                                                    
 dc=pickFirstFoundConnection();
  -                                                                             }
  -                                                                             if ( 
dc == null ) break;
  -                                                                             
  +                                                                     
                                        //Get the message ( if there is one message 
pending )
                                        SpyMessage mes=startWorkQueue();
                                        if (mes==null) {
  -                                             lastUsedConnection=saveLastConnection;
  +                                             Log.log("Done dispatching messages: No 
more message to send");
                                                break;
                                        }
  -                                     if (mes.isOutdated()) {
  -                                             lastUsedConnection=saveLastConnection;
  +                                     
  +                                     if (mes.isOutdated()) 
                                                continue;
  +                                     
  +                                     // we may have to restore the 
lastUsedQueueReceiver
  +                                     // if message on the queue is not sent. (we 
don't want to skip 
  +                                     // destination in the round robin)
  +                                     JMSServerQueueReceiver qr;
  +                                     if( useRoundRobinMessageDistribution ) {
  +                                             qr=pickNextRoundRobinQueueReceiver();
  +                                     } else {
  +                                             qr=pickFirstFoundQueueReceiver();
                                        }
  -                                                                     
  +                                     if ( qr == null ) {
  +                                             restoreMessage(mes);
  +                                             Log.log("Done dispatching messages: No 
receiver available for dispatch");
  +                                             break;
  +                                     }
  +                                                                                    
                                                                 
                                        //Send the message
                                        try {
  -                                             dc.cr.receive(destination,mes);
  +                                             qr.sendOneMessage(mes);
                                        } catch (NoReceiverException e) {
                                                
                                                //Log.log(e);
  -                                             Log.log("There was no receiver for the 
client "+dc);
  +                                             Log.log("Got a NoReceiverException 
from: "+qr.dc.getClientID());
  +                                             restoreMessage(mes);    
  +                                             qr.setListening(false);
   
  -                                             try {
  -                                                     synchronized (messages) {
  -                                                             messages.add(mes);     
 
  -                                                     }
  -                                                     connectionListening(false,dc);
  -                                             } catch (Exception e2) {
  -                                                     Log.error(e2);
  -                                             }                               
  -
                                        } catch (JMSException e) {
                                                  throw e;
                                        } catch (Exception e) {
                                                //This is a transport failure. We 
should define our own Transport Failure class
  -                                             //for a better execption catching      
                                                                                   
  -                                             try {
  -                                                     synchronized (messages) {
  -                                                             messages.add(mes);     
 
  -                                                     }
  -                                             } catch (Exception e2) {
  -                                                     Log.error(e2);
  -                                             }
  +                                             //for a better execption catching
                                                
  -                                             Log.error("Cannot deliver this message 
to the client "+dc);                                     
  +                                             restoreMessage(mes);                   
                         
  +                                             Log.error("Cannot deliver this message 
to the client "+qr.dc.getClientID());
                                                Log.error(e);
  -                                             handleConnectionFailure(dc,null);
  +                                             handleConnectionFailure(qr.dc,null);
                                        } 
                                                                        
                                }
  @@ -421,40 +357,20 @@
                }
        }
        
  -     SpyMessage queueReceiveNoWait()
  -     {
  -             synchronized (messages) {
  -                     if (messages.size()==0) return null;
  -                     SpyMessage m = (SpyMessage)messages.first();
  -                     messages.remove(m);
  -                     return m;
  -             }
  -     }
  +
                
        void connectionListening(boolean mode,SpyDistributedConnection dc) throws 
JMSException 
        {
        
  -             // Before check 
  -             
  +             // Before check                 
                // Synchronized code : We want to avoid sending messages while we are 
changing the connection status 
                synchronized (this) {
                        
                        
  -                     SpyDistributedConnection 
distributedConnection=(SpyDistributedConnection)subscribers.get(dc.getClientID());
  -                     if (distributedConnection==null) throw new JMSException("This 
DistributedConnection is not registered");
  -             
  -                     if (mode) {                     
  -                             if (!distributedConnection.listeners) {
  -                                     distributedConnection.listeners=true;
  -                                     listeners++;
  -                             }
  -                     } else {
  -                             if (distributedConnection.listeners) {
  -                                     distributedConnection.listeners=false;
  -                                     listeners--;
  -                             }
  -                     }
  +                     JMSServerQueueReceiver 
qr=(JMSServerQueueReceiver)subscribers.get(dc.getClientID());
  +                     if (qr==null) throw new JMSException("This 
DistributedConnection is not registered");
   
  +                     qr.setListening(mode);
                
                        if (listeners!=0&&!threadWorking&&!alreadyInTaskQueue) {
                                synchronized (messages) {
  @@ -462,7 +378,152 @@
                                }
                        }
                
  -                     Log.log("Listeners for "+destination+" = "+listeners);
                }               
        }       
  +
  +
  +     
  +     public void acknowledge(SpyDistributedConnection dc, String messageId, boolean 
isAck) throws JMSException       {
  +     
  +             JMSServerQueueReceiver qr = 
(JMSServerQueueReceiver)subscribers.get(dc.getClientID());
  +             if( qr==null )
  +                     throw new JMSException("You have not subscribed to this 
destination.");
  +                     
  +             qr.acknowledge(messageId, isAck);
  +             
  +     }       
  +
  +     
  +             
  +     /**
  +      * Get a SpyDistributedConnection object that is listening 
  +      * to this queue.  Picks the first one it can find.
  +      */
  +     private JMSServerQueueReceiver pickFirstFoundQueueReceiver() {
  +
  +             // No valid next connection will exist, return null
  +             if (listeners == 0)
  +                     return null;
  +
  +             Iterator i = subscribers.values().iterator();
  +             while (i.hasNext()) {
  +                     JMSServerQueueReceiver t = (JMSServerQueueReceiver) i.next();
  +
  +                     // Test to see if the connection is valid pick
  +                     if (t.isListening()) {
  +                             return t;
  +                     }
  +             }
  +
  +             // We got here because we did not find a valid item in the list.
  +             Log.error("FIXME: The listeners count was invalid !");
  +             return null;
  +     }       
  +
  +     
  +             
  +     /**
  +      * Get a JMSServerQueueReceiver object that is listening 
  +      * to this queue.  If multiple objects are listening to the queue
  +      * this multiple calls to this method will cycle through them in a round 
  +      * robin fasion.
  +      */
  +     private JMSServerQueueReceiver pickNextRoundRobinQueueReceiver() {
  +
  +             // No valid next connection will exist, return null
  +             if (listeners == 0)
  +                     return null;
  +
  +             Iterator i = subscribers.values().iterator();
  +             JMSServerQueueReceiver firstFoundConnection = null;
  +             boolean enableSelectNext = false;
  +
  +             while (i.hasNext()) {
  +                     JMSServerQueueReceiver t = (JMSServerQueueReceiver) i.next();
  +
  +                     // Select the next valid connection if we are past the last 
used connection
  +                     if (t == lastUsedQueueReceiver || lastUsedQueueReceiver == 
null)
  +                             enableSelectNext = true;
  +
  +                     // Test to see if the connection is valid pick
  +                     if (t.isListening()) {
  +                             // Store the first valid connection since the last 
used might be the last
  +                             // in the list 
  +                             if (firstFoundConnection == null)
  +                                     firstFoundConnection = t;
  +
  +                             // Are we past the last used? then we have the next 
item in the round robin  
  +                             if (enableSelectNext && t != lastUsedQueueReceiver) {
  +                                     lastUsedQueueReceiver = t;
  +                                     return t;
  +                             }
  +                     }
  +             }
  +
  +             // We got here because we did not find a valid item in the list after 
the last
  +             // used item, so lest use the first valid item 
  +             if (firstFoundConnection != null) {
  +                     lastUsedQueueReceiver = firstFoundConnection;
  +                     return firstFoundConnection;
  +             } else {
  +                     Log.error("FIXME: The listeners count was invalid !");
  +                     return null;
  +             }
  +     }       
  +
  +     
  +     
  +             
  +     //Used by QueueReceivers for receive(), receive(long wait), and receiveNoWait()
  +     SpyMessage queueReceive(long wait, SpyDistributedConnection dc) throws  
JMSException
  +     {
  +             
  +             // Synchronized code : We want to avoid sending messages while we are 
changing the connection status 
  +             synchronized (this) {
  +                     JMSServerQueueReceiver 
qr=(JMSServerQueueReceiver)subscribers.get(dc.getClientID());
  +                     if (qr==null) throw new JMSException("This 
DistributedConnection is not registered");
  +
  +                     if( wait < 0 ) {
  +                             synchronized (messages) {
  +                                     if (messages.size()==0) return null;
  +                                     SpyMessage m = (SpyMessage)messages.first();
  +                                     messages.remove(m);
  +                                     return m;
  +                             }
  +                     } else {
  +                             
  +                             qr.addReceiver(wait);
  +                             
  +                             if (listeners!=0&&!threadWorking&&!alreadyInTaskQueue) 
{
  +                                     synchronized (messages) {
  +                                             if (!messages.isEmpty()) 
notifyWorkers();
  +                                     }
  +                             }
  +                     
  +                             return null;
  +                     }
  +             }
  +     }       
  +
  +     
  +             
  +     //Used to put a message that was added previously to the queue, back in the 
queue
  +     public void restoreMessage(SpyMessage mes) 
  +     {
  +             //restore a message to the message list...
  +             synchronized (messages) {
  +                     messages.add(mes);      
  +                     
  +                     if (isTopic) {
  +                             //if a thread is already working on this destination, 
I don't have to myself to the taskqueue
  +                             if (!threadWorking) notifyWorkers();
  +                     } else {
  +                             if (listeners!=0&&!threadWorking) notifyWorkers();
  +                     }
  +
  +             }
  +     }
  +
  +
  +     
   }
  
  
  
  1.15      +256 -92   spyderMQ/src/java/org/spydermq/SpyConnection.java
  
  Index: SpyConnection.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyConnection.java,v
  retrieving revision 1.14
  retrieving revision 1.15
  diff -u -r1.14 -r1.15
  --- SpyConnection.java        2000/11/16 22:39:46     1.14
  +++ SpyConnection.java        2000/11/19 19:59:56     1.15
  @@ -31,7 +31,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.14 $
  + *   @version $Revision: 1.15 $
    */
   public class SpyConnection 
        implements Connection, Serializable
  @@ -140,14 +140,22 @@
                if (!modeStop) return;
                modeStop=false;
   
  -             Iterator i=destinations.values().iterator();
  +             Iterator i=destinations.keySet().iterator();
                while (i.hasNext()) {
  -                     ConnectionQueue cq=(ConnectionQueue)i.next();
  -                     cq.start();
  +                     Destination d=(Destination)i.next();
  +                     ConsumerSet ci=(ConsumerSet)destinations.get(d);
  +
  +                     if ( ci.getLasListeningState() ) {
  +                             try {
  +                                     
provider.connectionListening(true,d,distributedConnection);
  +                             } catch ( Exception e ) {
  +                                     failureHandler(e, "Cannot contact the JMS 
server");
  +                             }
  +                     }
  +                             
                }
   
  -             changeModeStop(modeStop);
  -             
  +             changeModeStop(modeStop);               
        }
   
        public void stop() throws JMSException
  @@ -157,11 +165,20 @@
                
                if (modeStop) return;           
                modeStop=true;                          
  -             
  -             Iterator i=destinations.values().iterator();
  +
  +             Iterator i=destinations.keySet().iterator();
                while (i.hasNext()) {
  -                     ConnectionQueue cq=(ConnectionQueue)i.next();
  -                     cq.stop();
  +                     Destination d=(Destination)i.next();
  +                     ConsumerSet ci=(ConsumerSet)destinations.get(d);
  +
  +                     if ( ci.getLasListeningState() ) {
  +                             try {
  +                                     
provider.connectionListening(false,d,distributedConnection);
  +                             } catch ( Exception e ) {
  +                                     failureHandler(e, "Cannot contact the JMS 
server");
  +                             }
  +                     }
  +                             
                }
   
                changeModeStop(modeStop);
  @@ -278,75 +295,9 @@
                }
        }
        
  -     //A Session has created a new MessageConsumer for the Destination dest
  -     void addSession(Destination dest, SpySession who) throws JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The connection is 
closed");                
  -             if (distributedConnection==null) createReceiver();
  -                                                                      
  -             Log.log("Connection: addSession(dest="+dest.toString()+")");
  -                             
  -             
  -             try {
  -
  -                     synchronized (destinations) {
  -                             
  -                             ConnectionQueue 
connectionQueue=(ConnectionQueue)destinations.get(dest);
  -                             
  -                             if (connectionQueue==null) {                    
  -                                     connectionQueue=new ConnectionQueue(dest,this);
  -                                     connectionQueue.addSession(who);
  -                                     HashMap 
newDestinations=(HashMap)destinations.clone();
  -                                     newDestinations.put(dest,connectionQueue);
  -                                     destinations=newDestinations;
  -                                     provider.subscribe(dest,distributedConnection);
  -                             } else {                        
  -                                     connectionQueue.addSession(who);               
         
  -                             }
  -                     }
  -
  -             } catch (Exception e) {
  -                     failureHandler(e,"Cannot subscribe to this Destination");
  -             }       
                
  -     }               
        
  -     //The session does not need to recieve the messages to Destination dest
  -     void removeSession(Destination dest, SpySession who) throws JMSException
  -     {
  -             if (distributedConnection==null) createReceiver();
  -             
  -             Log.log("Connection: removeSession(dest="+dest.toString()+")");
  -             
  -             try {
  -                     
  -                     synchronized (destinations) {
  -                             
  -                             ConnectionQueue 
connectionQueue=(ConnectionQueue)destinations.get(dest);
  -                             
  -                             if (connectionQueue!=null) {
  -                                     boolean 
empty=connectionQueue.removeSession(who);
  -                                     if (empty) {
  -                                             HashMap 
newDestinations=(HashMap)destinations.clone();
  -                                             newDestinations.remove(dest);
  -                                             destinations=newDestinations;
  -                                             
provider.unsubscribe(dest,distributedConnection);
  -                                     } 
  -                             } else {
  -                                     //this should not happen
  -                                     HashMap 
newDestinations=(HashMap)destinations.clone();
  -                                     newDestinations.remove(dest);
  -                                     destinations=newDestinations;
  -                                     
provider.unsubscribe(dest,distributedConnection);
  -                             }
  -                             
  -                     }
  -                     
  -             } catch (Exception e) {
  -                     failureHandler(e,"Cannot unsubscribe to this destination");
  -             }
   
  -     }
        
        //Get a new messageID (creation of a new message)
        String getNewMessageID() throws JMSException
  @@ -383,15 +334,7 @@
                //We could check this, though
        }
        
  -     SpyMessage queueReceiveNoWait(Queue queue) throws JMSException 
  -     {
  -             try {
  -                     return provider.queueReceiveNoWait(queue);
  -             } catch (Exception e) {
  -                     failureHandler(e,"Cannot create a ConnectionReceiver");
  -                     return null;
  -             }
  -     }
  +
        
        // Protected -------------------------------------------------------
   
  @@ -401,6 +344,7 @@
                try {
                        if (clientID==null) askForAnID();
                        
distributedConnection=ConnectionReceiverFactory.createDistributedConnection(clientID,this,crClassName);
  +                     provider.setSpyDistributedConnection(distributedConnection);
                } catch (Exception e) {
                        failureHandler(e,"Cannot create a ConnectionReceiver");
                }
  @@ -416,9 +360,10 @@
                }               
        }
   
  +     
        public void failureHandler(Exception e,String reason) throws JMSException
        {
  -             Log.error(e);
  +             e.printStackTrace();
                
                JMSException excep=new JMSException(reason);
                excep.setLinkedException(e);
  @@ -431,11 +376,230 @@
                
                throw excep;
        }
  +
  +             
  +     protected void acknowledge(Destination dest, String messageId, boolean isAck) 
throws JMSException 
  +     {
  +             try {
  +                     SpyAcknowledgementItem item[] = { new SpyAcknowledgementItem() 
};                       
  +                     item[0].jmsDestination = dest;
  +                     item[0].jmsMessageID = messageId;
  +                     provider.acknowledge(item, isAck,distributedConnection);
  +             } catch (Exception e) {
  +                     failureHandler(e,"Cannot acknowlege a message.");
  +             }               
  +     }       
  +
  +
        
  -/**
  - * Creation date: (11/16/2000 2:20:22 PM)
  - * @return org.spydermq.distributed.interfaces.DistributedJMSServer
  - */
  -public org.spydermq.distributed.interfaces.DistributedJMSServer getProvider() {
  -     return provider;
  -}}
  +     public DistributedJMSServer getProvider() {
  +             return provider;
  +     }
  +     
  +
  +     // Used to ack/nak a set of messages.   
  +     protected void acknowledge(SpyAcknowledgementItem[] items, boolean isAck) 
throws JMSException  {
  +             
  +             try {
  +                     provider.acknowledge(items, isAck,distributedConnection);
  +             } catch (Exception e) {
  +                     failureHandler(e,"Cannot acknowlege a message.");
  +             }
  +             
  +     }       
  +
  +     // The ConsumerSet inner class is used by:
  +     //
  +     //              addConsumer()
  +     //      removeConsumer()
  +     //      getConsumers()
  +     //      listenerChange()
  +     //              pickListeningConsumer()
  +     //
  +     class ConsumerSet extends HashSet {
  +             boolean lasListeningState=false;
  +             
  +             boolean getLasListeningState() {
  +                     return lasListeningState;
  +             }
  +                     
  +             boolean listenStateChanged() { 
  +                     boolean t = false;
  +                     
  +                     Iterator iter = iterator();
  +                     while( iter.hasNext() ) {
  +                             SpyMessageConsumer c = (SpyMessageConsumer)iter.next();
  +                             if( c.isListening() ) {
  +                                     t = true;
  +                                     break;
  +                             }
  +                     }
  +                     
  +                     if( t == lasListeningState ) {
  +                             return false;
  +                     }
  +                     lasListeningState = t;
  +                     return true;
  +             }
  +     }
  +             
  +     //A new Consumer has been created for the Destination dest
  +     void addConsumer(Destination dest, SpyMessageConsumer consumer) throws 
JMSException
  +     {
  +             if (closed) throw new IllegalStateException("The connection is 
closed");                
  +             if (distributedConnection==null) createReceiver();
  +                                                                      
  +             Log.log("Connection: addConsumer(dest="+dest.toString()+")");
  +
  +             try {
  +
  +                     synchronized (destinations) {
  +                             
  +                             ConsumerSet 
consumerSet=(ConsumerSet)destinations.get(dest);
  +                             
  +                             if (consumerSet==null) {                        
  +                                     consumerSet=new ConsumerSet();
  +                                     consumerSet.add(consumer);
  +                                     HashMap 
newDestinations=(HashMap)destinations.clone();
  +                                     newDestinations.put(dest,consumerSet);
  +                                     destinations=newDestinations;
  +                                     provider.subscribe(dest,distributedConnection);
  +                             } else {                        
  +                                     consumerSet.add(consumer);
  +                             }
  +                     }
  +
  +             } catch (Exception e) {
  +                     failureHandler(e,"Cannot subscribe to this Destination");
  +             }       
  +             
  +     }
  +     
  +
  +     //A consumer does not need to recieve the messages from a Destination 
  +     void removeConsumer(Destination dest, SpyMessageConsumer who) throws 
JMSException {
  +             
  +             if (distributedConnection==null) createReceiver();
  +             
  +             Log.log("Connection: removeSession(dest="+dest.toString()+")");
  +             
  +             try {
  +                     
  +                     synchronized (destinations) {
  +                             
  +                             ConsumerSet 
consumerSet=(ConsumerSet)destinations.get(dest);
  +                             
  +                             if (consumerSet!=null) {
  +                                     boolean empty=consumerSet.remove(who);
  +                                     if (empty) {
  +                                             HashMap 
newDestinations=(HashMap)destinations.clone();
  +                                             newDestinations.remove(dest);
  +                                             destinations=newDestinations;
  +                                             
provider.unsubscribe(dest,distributedConnection);
  +                                     } 
  +                             } else {
  +                                     //this should not happen
  +                                     HashMap 
newDestinations=(HashMap)destinations.clone();
  +                                     newDestinations.remove(dest);
  +                                     destinations=newDestinations;
  +                                     
provider.unsubscribe(dest,distributedConnection);
  +                             }
  +                             
  +                     }
  +                     
  +             } catch (Exception e) {
  +                     failureHandler(e,"Cannot unsubscribe to this destination");
  +             }
  +
  +     }       
  +
  +             
  +     //Gets all the consumers subscribed to a destination
  +     public SpyMessageConsumer[] getConsumers(Destination dest) throws JMSException 
{
  +             
  +             if (closed) throw new IllegalStateException("The connection is 
closed");                
  +             if (distributedConnection==null) createReceiver();
  +     
  +             synchronized (destinations) {
  +                     ConsumerSet consumerSet=(ConsumerSet)destinations.get(dest);   
                         
  +                     if (consumerSet==null || consumerSet.size()==0) 
  +                             return null;
  +                     
  +                     SpyMessageConsumer rc[]=new 
SpyMessageConsumer[consumerSet.size()];
  +                     return (SpyMessageConsumer[])consumerSet.toArray(rc);
  +             }
  +                                     
  +     }
  +
  +     //Gets the first consumer that is listening to a destination.   
  +     public SpyMessageConsumer pickListeningConsumer(Destination dest) throws 
JMSException {
  +             
  +             if (closed) throw new IllegalStateException("The connection is 
closed");                
  +             if (distributedConnection==null) createReceiver();
  +                                                                      
  +             synchronized (destinations) {
  +                     
  +                     ConsumerSet consumerSet=(ConsumerSet)destinations.get(dest);
  +                     
  +                     if (consumerSet==null || consumerSet.size()==0) {
  +                             return null;
  +                     } else {                        
  +                             Iterator i = consumerSet.iterator();
  +                             while( i.hasNext() ) {
  +                                     SpyMessageConsumer c = 
(SpyMessageConsumer)i.next();
  +                                     if( c.isListening() || c.waitInReceive ) {
  +                                             return c;
  +                                     }
  +                             }
  +                     }
  +             }
  +
  +             return null;
  +             
  +     }
  +     
  +     
  +
  +     
  +     /**
  +      * Called whenever a consumer changes his listening state on a destination.
  +      * We see if the consumer change, changed the overall listening state for the 
destination.
  +      * Creation date: (11/16/2000 2:20:22 PM)
  +      * @return org.spydermq.distributed.interfaces.DistributedJMSServer
  +      */             
  +     public void listenerChange(Destination d) throws JMSException {
  +             
  +             if (closed) throw new IllegalStateException("The connection is 
closed");
  +             if (distributedConnection==null) createReceiver();
  +                     
  +             ConsumerSet ci=(ConsumerSet)destinations.get(d);
  +             if( ci.listenStateChanged() ) {
  +                     try {
  +                             if ( ci.getLasListeningState() ) {
  +                                     
provider.connectionListening(true,d,distributedConnection);
  +                             } else {
  +                                     
provider.connectionListening(false,d,distributedConnection);
  +                             }
  +                     } catch ( Exception e ) {
  +                             failureHandler(e, "Cannot contact the JMS server");
  +                     }
  +             }
  +             
  +     }       
  +     
  +     
  +     /**
  +      * Creation date: (11/16/2000 2:20:22 PM)
  +      * @return org.spydermq.distributed.interfaces.DistributedJMSServer
  +      */              
  +     SpyMessage queueReceive(Queue queue, long wait) throws JMSException {
  +             
  +             try {
  +                     return provider.queueReceive(queue, 
wait,distributedConnection);
  +             } catch (Exception e) {
  +                     failureHandler(e,"Cannot create a ConnectionReceiver");
  +                     return null;
  +             }
  +     }
  +     
  +}
  
  
  
  1.7       +7 -6      spyderMQ/src/java/org/spydermq/SpyDistributedConnection.java
  
  Index: SpyDistributedConnection.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyDistributedConnection.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- SpyDistributedConnection.java     2000/11/04 13:30:23     1.6
  +++ SpyDistributedConnection.java     2000/11/19 19:59:57     1.7
  @@ -6,16 +6,18 @@
    */
   package org.spydermq;
   
  +import java.util.HashMap;
  +import java.io.Serializable;
   import org.spydermq.distributed.interfaces.ConnectionReceiver;
   import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
  -import java.io.Serializable;
  +
   
   /**
    *   This class is the broker point of view on a SpyConnection (it contains a 
ConnectionReceiver)
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.6 $
  + *   @version $Revision: 1.7 $
    */
   public class SpyDistributedConnection 
        implements Serializable
  @@ -24,7 +26,6 @@
        private String clientID;
        private int hash;
        public transient ConnectionReceiverSetup cr_server;
  -     public transient boolean listeners;
        public ConnectionReceiver cr;
        
        public SpyDistributedConnection(String clientID,ConnectionReceiverSetup 
cr_server)
  @@ -47,9 +48,9 @@
        
        public boolean equals(Object obj)
        {
  -        // Fixes NPE. Patch submitted by John Ellis (10/29/00)
  -        if (obj==null) return false;
  -        
  +             // Fixes NPE. Patch submitted by John Ellis (10/29/00)
  +             if (obj==null) return false;
  +             
                if (obj.getClass()!=SpyDistributedConnection.class) return false;
                if (obj.hashCode()!=hash) return false;
                if (clientID==null) return true;
  
  
  
  1.7       +49 -21    spyderMQ/src/java/org/spydermq/SpyMessage.java
  
  Index: SpyMessage.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessage.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- SpyMessage.java   2000/11/17 17:30:27     1.6
  +++ SpyMessage.java   2000/11/19 19:59:57     1.7
  @@ -15,8 +15,15 @@
   import java.util.Enumeration;
   import java.util.Hashtable;
   import java.io.Serializable;
  -
   import java.lang.Comparable;
  +
  +/**
  + *   This class implements javax.jms.Message
  + *      
  + *   @author Norbert Lataille ([EMAIL PROTECTED])
  + * 
  + *   @version $Revision: 1.7 $
  + */
   public class SpyMessage 
        implements Serializable, Cloneable, Message, Comparable
   {
  @@ -58,20 +65,21 @@
        //Message body
        protected boolean msgReadOnly=false;
        
  -     //Those attributes are transient ---------------
  -     
  -     //For acknowledgment
  -     private transient SessionQueue mySessionQueue;
  +     //For acknowledgment (set on the client side)
  +     private transient SpySession spySession;
  +
        //For the storage in the JMSServerQueue object
        public transient SpyDistributedConnection originalDistributedConnection;
  -
  +     //For ordering in the JMSServerQueue
  +     public transient long messageId;
  +     
        // Constructor ---------------------------------------------------
           
        SpyMessage()
        {
                prop=new Hashtable();
                propReadWrite=true;
  -             mySessionQueue=null;
  +             spySession=null;
        }       
   
        // Public --------------------------------------------------------
  @@ -401,18 +409,15 @@
        
        public void acknowledge() throws JMSException
        {
  -             //There is no need to acknowledge() this message
  -             if (mySessionQueue==null) return;
  +             if (spySession==null) 
  +                     throw new JMSException("This message was not recieved from the 
provider");
  +                     
  +             if( spySession.acknowledgeMode == spySession.CLIENT_ACKNOWLEDGE )
  +                     doAcknowledge();
                
  -             mySessionQueue.acknowledge(this);
        }
   
  -     // Package protected ---------------------------------------------
  -         
  -     void setSessionQueue(SessionQueue sessionQueue)
  -     {
  -             mySessionQueue=sessionQueue;
  -     }
  +
   
        void setReadOnlyMode()
        {
  @@ -435,9 +440,9 @@
                long ts=System.currentTimeMillis();
                return jmsExpiration<ts;
        }
  -                     
  -     //For ordering in the JMSServerQueue
  -     public transient long messageId;        /**
  +     
  +             
  +     /**
         * Return a negative number if this message should be sent
         * before the o message. Return a positive if should be sent
         * after the o message.
  @@ -452,6 +457,29 @@
                if( jmsPriority < sm.jmsPriority ) {
                        return 1;
                }
  -             return (int)(messageId - sm.messageId);
  +             return (int)(messageId - sm.messageId);         
  +     }  
  +
  +     
  +     public void doAcknowledge() throws JMSException
  +     {
  +             spySession.getConnection().acknowledge(jmsDestination, jmsMessageID, 
true);
  +     }       
  +     
  +     
  +     public void doNegAcknowledge() throws JMSException
  +     {
  +             spySession.getConnection().acknowledge(jmsDestination, jmsMessageID, 
false);
  +     }
  +
                
  -     }      }
  +     public SpySession getSpySession() {
  +             return spySession;
  +     }
  +
  +     
  +     public void setSpySession(SpySession newSpySession) {
  +             spySession = newSpySession;
  +     }
  +
  +}
  
  
  
  1.4       +64 -61    spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java
  
  Index: SpyMessageConsumer.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- SpyMessageConsumer.java   2000/06/09 20:03:58     1.3
  +++ SpyMessageConsumer.java   2000/11/19 19:59:57     1.4
  @@ -11,7 +11,9 @@
   import javax.jms.MessageListener;
   import javax.jms.Message;
   import javax.jms.Session;
  +import javax.jms.Destination;
   import java.util.LinkedList;
  +import java.util.Iterator;
   import org.spydermq.selectors.Selector;
   
   /**
  @@ -19,7 +21,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.3 $
  + *   @version $Revision: 1.4 $
    */
   public class SpyMessageConsumer 
        implements MessageConsumer
  @@ -27,7 +29,7 @@
        // Attributes ----------------------------------------------------
   
        //Link to my session
  -     protected SpySession session;
  +     public SpySession session;
        //My message listener (null if none)
        MessageListener messageListener;
        //Am I closed ?
  @@ -36,12 +38,14 @@
        public Selector selector;
        //The message selector
        public String messageSelector;
  -     //A link to my session queue (in my session)
  -     protected SessionQueue sessionQueue;
  +
        //List of Pending messages (not yet delivered)
        LinkedList messages;
        //Is the consumer sleeping in a receive() ?
        boolean waitInReceive;
  +     public Destination destination; 
  +     //If the session is transacted: contains JMSMessageId's of messages consumed
  +     LinkedList messagesConsumed;    
        
        // Constructor ---------------------------------------------------
           
  @@ -54,30 +58,30 @@
                messageSelector=null;
                messages=new LinkedList();
                waitInReceive=false;
  +             if( session.transacted ) {
  +                     messagesConsumed = new LinkedList();
  +             }
        }
        
  -     void setSessionQueue(SessionQueue sessionQueue)
  -     {
  -             this.sessionQueue=sessionQueue;
  -     }               
  +             
        
        // Public --------------------------------------------------------
   
  -    public String getMessageSelector() throws JMSException
  +     public String getMessageSelector() throws JMSException
        {
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
                                
                return messageSelector;
        }
   
  -    public MessageListener getMessageListener() throws JMSException
  +     public MessageListener getMessageListener() throws JMSException
        {
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
                
                return messageListener;
        }
   
  -    public void setMessageListener(MessageListener listener) throws JMSException
  +     public void setMessageListener(MessageListener listener) throws JMSException
        {       
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
                if (waitInReceive) throw new JMSException("This MessageConsumer is 
waiting in receive() !");
  @@ -85,7 +89,7 @@
                //The QueueReceiver object need to notify their session / connection / 
the broker               
        }
   
  -    public Message receive() throws JMSException
  +     public Message receive() throws JMSException
        {
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
                if (messageListener!=null) throw new JMSException("A message listener 
is already registered");
  @@ -94,7 +98,7 @@
                return null;
        }
   
  -    public Message receive(long timeOut) throws JMSException
  +     public Message receive(long timeOut) throws JMSException
        {
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
                if (messageListener!=null) throw new JMSException("A message listener 
is already registered");
  @@ -103,7 +107,7 @@
                return null;
        }
   
  -    public Message receiveNoWait() throws JMSException
  +     public Message receiveNoWait() throws JMSException
        {
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
                if (messageListener!=null) throw new JMSException("A message listener 
is already registered");
  @@ -112,7 +116,7 @@
                return null;
        }
   
  -    public synchronized void close() throws JMSException
  +     public synchronized void close() throws JMSException
        {
                //Job is done in the inherited classes
                //The QueueReceiver object need to notify their session / connection / 
the broker
  @@ -155,38 +159,18 @@
                                        //the SAME Message object is put in different 
SessionQueues
                                        //when we deliver it, we have to clone() it to 
insure independance
                                        SpyMessage message=mes.myClone();
  -                                                                                    
                                                         
  -                                     if (!session.transacted) {
  -                                             if 
(session.acknowledgeMode==Session.CLIENT_ACKNOWLEDGE) {
  -                                                             
  -                                                     synchronized 
(sessionQueue.messagesWaitingForAck) {
  -                                                             //Put the message in 
the messagesWaitForAck queue
  -                                                             
sessionQueue.messagesWaitingForAck.addLast(message);
  -                                                     }
  -                                                             
  -                                                     
message.setSessionQueue(sessionQueue);
  -                                                             
  -                                             } else if 
(session.acknowledgeMode==Session.DUPS_OK_ACKNOWLEDGE) {
  -                                                     //DUPS_OK_ACKNOWLEDGE
  -                                             } else {
  -                                                     //AUTO_ACKNOWLEDGE 
  -                                                     //we don't need to keep this 
message in a queue
  -                                             }
  -                                     } else {
  -                                                     
  -                                             //We are linked to a transacted 
session                                                                                
 
  -                                                     
  -                                             synchronized 
(sessionQueue.messagesWaitingForAck) {
  -                                                     //Put the message in the 
messagesWaitForAck queue
  -                                                     
sessionQueue.messagesWaitingForAck.addLast(message);
  -                                             }
  -                                                     
  +                                     message.setSpySession(session);
  +
  +                                     if( session.transacted ) {
  +                                             
messagesConsumed.addLast(message.getJMSMessageID());
  +                                     } else if( 
session.acknowledgeMode==session.AUTO_ACKNOWLEDGE || 
session.acknowledgeMode==session.DUPS_OK_ACKNOWLEDGE ) {
  +                                             message.doAcknowledge();
                                        }
  -                                             
  +
                                        return message;
                                                                                
                                } catch (Exception e) {
  -                                     Log.error(e);
  +                                     e.printStackTrace();
                                }
   
                        }
  @@ -195,28 +179,47 @@
   
        }
        
  -     void addMessage(SpyMessage mes) throws JMSException
  +     public void addMessage(SpyMessage mes) throws JMSException
        {
  +             //Set the session in the message so it can acknowlege
  +             mes.setSpySession(session);
  +                     
                synchronized (messages) {
                        //Add a message to the queue
  -                     
  -                     //Test the priority
  -                     int pri=mes.getJMSPriority();
  -                     
  -                     if (pri<=4) {                           
  -                             //normal priority message
  -                             messages.addLast(mes);
  -                     } else {                                
  -                             //expedited priority message
  -                             int size=messages.size();
  -                             int i=0;                                
  -                             for(;i<size;i++) {
  -                                     if 
(((SpyMessage)messages.get(i)).getJMSPriority()<pri) break;
  -                             }                               
  -                             messages.add(i,mes);                            
  -                     }
  -                     
  +                     messages.addLast(mes);                  
                }
        }
        
  +     
  +     public boolean deliverMessage() throws JMSException {
  +             
  +             synchronized (messages) {
  +                     if (messages.size()==0) 
  +                             return false;
  +                                     
  +                     if (messageListener==null) {
  +                             if (!waitInReceive) 
  +                                     return false;
  +                             messages.notify();
  +                     } else {
  +                             SpyMessage mes=getMessage();
  +                             if (mes==null) 
  +                                     return false;
  +                                     
  +                             messageListener.onMessage(mes);
  +                             if( session.transacted ) {
  +                                     
messagesConsumed.addLast(mes.getJMSMessageID());
  +                             } else if( 
session.acknowledgeMode==session.AUTO_ACKNOWLEDGE || 
session.acknowledgeMode==session.DUPS_OK_ACKNOWLEDGE ) {
  +                                     mes.doAcknowledge();
  +                             }       
  +                             
  +                     }               
  +             }               
  +             return true;
  +     }       
  +
  +             
  +     public boolean isListening() {
  +             return false;
  +     }
   }
  
  
  
  1.5       +119 -77   spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java
  
  Index: SpyQueueReceiver.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- SpyQueueReceiver.java     2000/06/09 20:03:58     1.4
  +++ SpyQueueReceiver.java     2000/11/19 19:59:57     1.5
  @@ -17,7 +17,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.4 $
  + *   @version $Revision: 1.5 $
    */
   public class SpyQueueReceiver 
        extends SpyMessageConsumer 
  @@ -32,9 +32,10 @@
        
        // Constructor ---------------------------------------------------
           
  -    SpyQueueReceiver(SpyQueueSession session,Queue queue) 
  +     SpyQueueReceiver(SpyQueueSession session,Queue queue) 
        {
                super(session);
  +             this.destination=queue;
                this.queue=queue;
                listening=false;
        }
  @@ -59,122 +60,163 @@
        }
   
        //Overrides MessageConsumer
  -     
  -    public Message receive() throws JMSException
  -     {
  +     public Message receive() throws JMSException {
                super.receive();
  -                     
  -             setListening(true);
  -             
  +
  +
  +             //if the client follows the specification [4.4.6], he cannot use this 
session
  +             //to asynchronously receive a message or receive() in another thread.
  +             //If a message is already pending for this session, we can immediatly 
deliver it 
                synchronized (messages) {
  -                     
  -                     //if the client follows the specification [4.4.6], he cannot 
use this session
  -                     //to asynchronously receive a message or receive() in another 
thread.
  -                     //If a message is already pending for this session, we can 
immediatly deliver it 
  -                     
  -                     while (true) {
  -                             
  -                             if (closed) {
  -                                     setListening(false);
  -                                     return null;
  -                             }
  -                             
  -                             if (!session.modeStop) {
  -                                     Message mes=getMessage();
  -                                     if (mes!=null) {
  -                                             setListening(false);
  -                                             return mes;
  -                                     }
  -                             } else Log.log("the connection is stopped !");
                                
  -                             try {
  -                                     waitInReceive=true;
  +                     waitInReceive = true;                   
  +                     session.connection.queueReceive(queue, 0);
  +
  +                     try {
  +                     
  +                             while (true) {
  +
  +                                     if (!session.modeStop) {
  +
  +                                             Message mes = getMessage();
  +                                             if (mes != null) 
  +                                                     return mes;
  +
  +                                     } else
  +                                             Log.log("the connection is stopped !");
  +
                                        messages.wait();
  -                             } catch (InterruptedException e) {
  -                             } finally {
  -                                     waitInReceive=false;
                                }
  -                             
  -                     }
                        
  +                     } catch (InterruptedException e) {
  +                             JMSException newE = new JMSException("Receive 
interupted");
  +                             newE.setLinkedException(e);
  +                             throw newE;
  +                     } finally {
  +                             waitInReceive=false;    
  +                     }
                }
  -                             
        }
   
  -    public Message receive(long timeOut) throws JMSException
  +     public Message receive(long timeOut) throws JMSException
        {
                super.receive(timeOut);
  -             
  +                     
                if (timeOut==0) return receive();               
                long endTime=System.currentTimeMillis()+timeOut;
  -             
  -             setListening(true);
   
  +             
  +             //if the client respects the specification [4.4.6], he cannot use this 
session
  +             //to asynchronously receive a message or receive() from another thread.
  +             //If a message is already pending for this session, we can deliver it 
                synchronized (messages) {
                        
  -                     //if the client respects the specification [4.4.6], he cannot 
use this session
  -                     //to asynchronously receive a message or receive() from 
another thread.
  -                     //If a message is already pending for this session, we can 
deliver it 
  -                                             
  -                     while (true) {
  -                             
  -                             if (closed) {
  -                                     setListening(false);
  -                                     return null;
  -                             }
  +                     waitInReceive=true;
  +                     session.connection.queueReceive(queue,timeOut);
  +                     
  +                     try {
                                
  -                             if (!session.modeStop) {
  -                                     Message mes=getMessage();
  -                                     if (mes!=null) {
  -                                             setListening(false);
  -                                             return mes;
  +                             while (true) {
  +                                             
  +                                     if (!session.modeStop) {
  +                                             Message mes=getMessage();
  +                                             if (mes!=null) {
  +                                                     return mes;
  +                                             }
  +                                             
  +                                     } else 
  +                                             Log.log("the connection is stopped !");
  +                                     
  +                                     long att=endTime-System.currentTimeMillis();
  +                                     if (att<=0) {
  +                                             return null;                           
 
                                        }
  -                             } else Log.log("the connection is stopped !");
  -                             
  -                             long att=endTime-System.currentTimeMillis();
  -                             if (att<=0) {
  -                                     setListening(false);
  -                                     return null;
  -                             }
                                
  -                             try {                                   
  -                                     waitInReceive=true;
                                        messages.wait(att);
  -                             } catch (InterruptedException e) {
  -                             } finally {
  -                                     waitInReceive=false;
                                }
                                
  -                     }
  +                     } catch (InterruptedException e) {
  +                             JMSException newE = new JMSException("Receive 
interupted");
  +                             newE.setLinkedException(e);
  +                             throw newE;
  +                     } finally {
  +                             waitInReceive=false;    
  +                     }
                }
        
        }
   
  -    public Message receiveNoWait() throws JMSException
  +     public Message receiveNoWait() throws JMSException
        {
  -             super.receiveNoWait();
  -             
  +             super.receiveNoWait();          
                if (session.modeStop) return null;
  -                     
  -             return session.connection.queueReceiveNoWait(queue);
  +             return session.connection.queueReceive(queue,-1);
        }       
   
  -    public void setMessageListener(MessageListener listener) throws JMSException
  +     public void setMessageListener(MessageListener listener) throws JMSException
        {       
                super.setMessageListener(listener);
                
                messageListener=listener;
                setListening(listener!=null);
        }
  -     
  -     //---
        
  +     //---   
        void setListening(boolean newvalue) throws JMSException
        {
                if (newvalue==listening) return;
                listening=newvalue;
  -             if (listening) sessionQueue.changeNumListening(1);
  -             else sessionQueue.changeNumListening(-1);
  +             
  +             session.getConnection().listenerChange(queue);
        }
   
  +     //Called by the ConnectionReceiver which has just received a message - in the 
Queue case only
  +     public void dispatchMessage(SpyMessage mes) throws JMSException {
  +
  +             if (session.closed)
  +                     throw new NoReceiverException("The session is closed");
  +             if (session.modeStop)
  +                     throw new NoReceiverException("The session is stopped");
  +             if (mes.isOutdated())
  +                     return;
  +                     
  +             //Work with this receiver
  +             if (messageListener == null) {
  +                     synchronized (messages) {
  +                             
  +                             if ( waitInReceive  ) {
  +                                     if( messages.size()==0 ) {
  +                                             addMessage(mes);
  +                                             messages.notify();
  +                                     } else {
  +                                             Log.notice("Got too many messages for 
one receive.!");
  +                                             throw new NoReceiverException("Got too 
many messages for one receive.!");
  +                                     }
  +                             } else {
  +                                     Log.notice("Message did not arrive in time for 
the receive!");
  +                                     throw new NoReceiverException("Message did not 
arrive in time for the receive!");                       
  +                             }
  +                                                     
  +                     }
  +             } else {
  +                     
  +                     if (!isListening())
  +                             throw new NoReceiverException("The receiver is not 
longer listening!");
  +                             
  +                     //Set the session in the message so it can acknowlege
  +                     mes.setSpySession(session);
  +                     messageListener.onMessage(mes);
  +
  +                     if( session.transacted ) {
  +                             messagesConsumed.addLast(mes.getJMSMessageID());
  +                     } else if( session.acknowledgeMode==session.AUTO_ACKNOWLEDGE 
|| session.acknowledgeMode==session.DUPS_OK_ACKNOWLEDGE ) {
  +                             mes.doAcknowledge();
  +                     }       
  +             }
  +             
  +     }       
  +
  +     public boolean isListening() {
  +             return listening;
  +     }
   }
  
  
  
  1.5       +2 -8      spyderMQ/src/java/org/spydermq/SpyQueueSession.java
  
  Index: SpyQueueSession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueSession.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- SpyQueueSession.java      2000/11/14 06:15:53     1.4
  +++ SpyQueueSession.java      2000/11/19 19:59:57     1.5
  @@ -23,7 +23,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.4 $
  + *   @version $Revision: 1.5 $
    */
   public class SpyQueueSession 
        extends SpySession 
  @@ -68,7 +68,7 @@
                if (closed) throw new IllegalStateException("The session is closed");  
         
   
                SpyQueueReceiver receiver=new SpyQueueReceiver(this,queue);
  -             SessionQueue sessionQueue=addConsumer(queue,receiver);
  +             addConsumer(queue,receiver);
                
                return receiver;
        }
  @@ -95,13 +95,7 @@
                return ((SpyQueueConnection)connection).getTemporaryQueue();
        }
   
  -     //Not part of the spec
  -     
  -     //Called by the ConnectionReceiver object : put a new msg in the receiver's 
queue
  -     public void dispatchMessage(Destination dest,SpyMessage mes) throws 
JMSException
  -     {
  -             //Done in the SessionQueue :)
  -     }
  +
        
        
        // Package protected ---------------------------------------------
  
  
  
  1.12      +97 -100   spyderMQ/src/java/org/spydermq/SpySession.java
  
  Index: SpySession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpySession.java,v
  retrieving revision 1.11
  retrieving revision 1.12
  diff -u -r1.11 -r1.12
  --- SpySession.java   2000/11/14 06:15:53     1.11
  +++ SpySession.java   2000/11/19 19:59:57     1.12
  @@ -29,7 +29,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.11 $
  + *   @version $Revision: 1.12 $
    */
   public class SpySession 
        implements Runnable, Session
  @@ -45,8 +45,7 @@
        private MessageListener messageListener;
        //The connection object to which this session is linked
        protected SpyConnection connection;
  -     //HashMap of SessionQueue by Destination
  -     public HashMap destinations;
  +
        //The outgoing message queue 
        protected LinkedList outgoingQueue;
        //The outgoing message queue for messages that have been commited (if the 
session is transacted)
  @@ -61,15 +60,17 @@
        public boolean alphaMode;
        // Should we do client side persistence?
        public boolean clientPersistence = true;
  +     //MessageConsumers created by this session
  +     protected HashSet consumers;
        
        // Constructor ---------------------------------------------------             
    
   
        SpySession(SpyConnection conn, boolean trans, int acknowledge, boolean stop)
        { 
  +
                connection=conn;
                transacted=trans;
                acknowledgeMode=acknowledge;
  -             destinations=new HashMap();
                outgoingQueue=new LinkedList();
                outgoingCommitedQueue=new LinkedList();
                modeStop=stop;
  @@ -77,15 +78,17 @@
                closed=false;
                mutex=new Mutex();
                alphaMode=true;
  +             consumers = new HashSet();
                
                //Start my thread 
  -             Thread oneThread=new Thread(this);
  +             Thread oneThread=new Thread(this, "SpySession");
                oneThread.setDaemon(true);
                oneThread.start();
                
                //Wait for the thread to sleep
                mutex.waitLocked();
                
  +             
        }
   
        // Public --------------------------------------------------------
  @@ -231,18 +234,19 @@
                                        Log.error(e);
                                }
                        }
  -                             
  -                     //if we are not in stopped mode, look at the incoming queue    
                         
  -                             
  -                     if (!modeStop) {
  -                                             
  -                             Collection values = destinations.values();
  -                             Iterator i=values.iterator();
  -                             while (i.hasNext()) {
  -                                     SessionQueue 
sessionQueue=(SessionQueue)i.next();
  -                                     doneJob=doneJob||sessionQueue.deliverMessage();
  +
  +                     try {   
  +                             //if we are not in stopped mode, look at the incoming 
queue                                                             
  +                             if (!modeStop) {
  +                                     Iterator i=consumers.iterator();
  +                                     while (i.hasNext()) {
  +                                             SpyMessageConsumer 
mc=(SpyMessageConsumer)i.next();
  +                                             doneJob=doneJob||mc.deliverMessage();
  +                                     }                                       
                                }
  -                                     
  +                     } catch (JMSException e) {
  +                             Log.log("Cannot receive a message from the 
provider...");
  +                             Log.error(e);
                        }
                                        
                        //If there was smthg to do, try again
  @@ -273,22 +277,16 @@
                
                //notify the sleeping synchronous listeners
                
  -             Collection values = destinations.values();
  -             Iterator i=values.iterator();
  +             Iterator i=consumers.iterator();
                while (i.hasNext()) {
  -                     SessionQueue sessionQueue=(SessionQueue)i.next();
  -                     sessionQueue.close();
  +                     SpyMessageConsumer 
messageConsumer=(SpyMessageConsumer)i.next();
  +                     messageConsumer.close();
                }       
                
                connection.sessionClosing(this);
        }
   
  -     //Called by the ConnectionReceiver which has just received a message
  -     public void dispatchMessage(Destination dest, SpyMessage mes) throws 
JMSException
  -     {
  -             //The job is done in inherited classes - in the SPyTopicSession only
  -             throw new RuntimeException("pure virtual call");
  -     }       
  +     
        
        //Commit a transacted session
        public synchronized void commit() throws JMSException
  @@ -328,16 +326,11 @@
                        //Move the outgoing messages from the outgoingQueue to the 
outgoingCommitedQueue
                        outgoingCommitedQueue.addAll(outgoingQueue);
                        outgoingQueue.clear();
  +
  +                     //Acknowlege all consumed messages
  +                     SpyAcknowledgementItem items[] = removeAcknowledgementItems();
  +                     connection.acknowledge(items, true);
                        
  -                     //Notify each SessionQueue that we are going to commit
  -                     Collection values = destinations.values();
  -                     Iterator i=values.iterator();
  -                     while (i.hasNext()) {
  -                             SessionQueue sessionQueue=(SessionQueue)i.next();
  -                             sessionQueue.commit();
  -                     }       
  -                     
  -                             
                        //We have finished our work, we can wake up the thread
                        modeStop=modeSav;
                        mutex.notifyLock();
  @@ -365,13 +358,9 @@
                        //Clear the outgoing queue
                        outgoingQueue.clear();
                        
  -                     //Notify each SessionQueue that we are going to rollback
  -                     Collection values = destinations.values();
  -                     Iterator i=values.iterator();
  -                     while (i.hasNext()) {
  -                             SessionQueue sessionQueue=(SessionQueue)i.next();
  -                             sessionQueue.recover();
  -                     }       
  +                     //Neg Acknowlege all consumed messages
  +                     SpyAcknowledgementItem items[] = removeAcknowledgementItems();
  +                     connection.acknowledge(items, false);
                        
                        //We have finished our work, we can wake up the thread
                        modeStop=modeSav;
  @@ -393,14 +382,10 @@
                synchronized (mutex) {
                        
                        mutex.waitToSleep();
  -                     
  -                     //Notify each SessionQueue that we are going to recover
  -                     Collection values = destinations.values();
  -                     Iterator i=values.iterator();
  -                     while (i.hasNext()) {
  -                             SessionQueue sessionQueue=(SessionQueue)i.next();
  -                             sessionQueue.recover();
  -                     }       
  +                             
  +                     //Neg Acknowlege all consumed messages
  +                     SpyAcknowledgementItem items[] = removeAcknowledgementItems();
  +                     connection.acknowledge(items, false);
                        
                        //We have finished our work, we can wake up the thread
                        modeStop=modeSav;
  @@ -414,64 +399,16 @@
        {       
                Log.log("SpySession: deleteDestination(dest="+dest.toString()+")");
                
  -             //Remove it from the subscribers list
  -             synchronized (destinations) {
  -                     HashMap newMap=(HashMap)destinations.clone();   
  +             synchronized (consumers) {
  +                     HashSet newMap=(HashSet)consumers.clone();      
                        newMap.remove(dest);
  -                     destinations=newMap;
  +                     consumers=newMap;
                }
                
                //We could look at our incoming and outgoing queues to drop messages
        }
        
  -     // Package protected ---------------------------------------------
  -         
  -     SessionQueue addConsumer(Destination dest, SpyMessageConsumer who) throws 
JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The session is closed");
  -                                                                             
  -             Log.log("Session: 
subscribe(dest="+dest.toString()+",MessageConsumer="+who.toString()+")");
  -             
  -             synchronized (destinations) {
  -                     SessionQueue sub=(SessionQueue)destinations.get(dest);
  -                     if (sub==null) {
  -                             sub=new SessionQueue(this,dest);
  -                             sub.addConsumer(who);
  -                             HashMap newDestinations=(HashMap)destinations.clone();
  -                             newDestinations.put(dest,sub);
  -                             destinations=newDestinations;
  -                             connection.addSession(dest,this);
  -                     } else {
  -                             sub.addConsumer(who);
  -                     }               
  -                     return sub;
  -             }
  -     }
  -
  -     void removeConsumer(Destination dest, SpyMessageConsumer who) throws 
JMSException
  -     {
  -             Log.log("Session: 
removeConsumer(Destination="+dest.toString()+",MessageConsumer="+who.toString()+")");
  -             
  -             synchronized (destinations) {
  -                     SessionQueue sub=(SessionQueue)destinations.get(dest);
  -                     if (sub!=null) {                                               
                 
  -                             boolean empty=sub.removeConsumer(who);
  -                             if (empty) {
  -                                     HashMap 
newDestinations=(HashMap)destinations.clone();
  -                                     newDestinations.remove(dest);
  -                                     destinations=newDestinations;
  -                                     connection.removeSession(dest,this);
  -                             } 
  -                     } else {
  -                             //this should not happen
  -                             HashMap newDestinations=(HashMap)destinations.clone();
  -                             newDestinations.remove(dest);
  -                             destinations=newDestinations;
  -                     }
  -             }                       
                
  -     }
  -             
        String getNewMessageID() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
  @@ -503,4 +440,64 @@
                
        }
                
  +     void removeConsumer(Destination dest, SpyMessageConsumer who) throws 
JMSException
  +     {
  +             Log.log("Session: 
removeConsumer(Destination="+dest.toString()+",MessageConsumer="+who.toString()+")");
  +             
  +             consumers.remove( who );
  +             
  +             connection.removeConsumer(dest, who );          
  +     }
  +     
  +     void addConsumer(Destination dest, SpyMessageConsumer who) throws JMSException
  +     {
  +             if (closed) throw new IllegalStateException("The session is closed");
  +                                                                             
  +             Log.log("Session: 
subscribe(dest="+dest.toString()+",MessageConsumer="+who.toString()+")");
  +             connection.addConsumer(dest, who);
  +
  +             consumers.add( who );
  +             
  +     }
  +
  +     
  +     /**
  +      * @return org.spydermq.SpyConnection
  +      */
  +     public SpyConnection getConnection() {
  +             return connection;
  +     }
  +     
  +
  +     //Get all the messages that have been consumed but need acks
  +     private SpyAcknowledgementItem[] removeAcknowledgementItems() 
  +     {
  +             //Count the messages
  +             int i=0;
  +             Iterator iter=consumers.iterator();
  +             while (iter.hasNext()) {
  +                     SpyMessageConsumer mc=(SpyMessageConsumer)iter.next();
  +                     i += mc.messagesConsumed.size();
  +             }       
  +             
  +             //Now fill  the array
  +             SpyAcknowledgementItem items[] = new SpyAcknowledgementItem[i];
  +             i=0;
  +             iter=consumers.iterator();
  +             while (iter.hasNext()) {
  +                     SpyMessageConsumer mc=(SpyMessageConsumer)iter.next();
  +                     Iterator mesIter = mc.messagesConsumed.iterator();
  +                     while(mesIter.hasNext()) {
  +                             String messageId = (String)mesIter.next();
  +                             SpyAcknowledgementItem item = new 
SpyAcknowledgementItem();
  +                             item.jmsDestination = mc.destination;
  +                             item.jmsMessageID = messageId;
  +                             items[i++] = item;
  +                     }
  +                     mc.messagesConsumed.clear();
  +             }
  +             
  +             return items;
  +     }
  +     
   }
  
  
  
  1.7       +1 -24     spyderMQ/src/java/org/spydermq/SpyTopicSession.java
  
  Index: SpyTopicSession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSession.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- SpyTopicSession.java      2000/11/14 06:15:54     1.6
  +++ SpyTopicSession.java      2000/11/19 19:59:57     1.7
  @@ -28,7 +28,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.6 $
  + *   @version $Revision: 1.7 $
    */
   public class SpyTopicSession 
        extends SpySession 
  @@ -63,7 +63,7 @@
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                
                SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,noLocal);
  -             SessionQueue sessionQueue=addConsumer(topic,sub);
  +             addConsumer(topic,sub);
                
                if (messageSelector!=null) {
                        Selector selector=new Selector(messageSelector);        
  @@ -108,30 +108,7 @@
        }
        
        
  -     // - Package protected ---------------------------------------------
  -     // - Not part of the spec
  -     
  -     //Called by the ConnectionReceiver object : put a new msg in the receiver's 
queue
  -     public void dispatchMessage(Destination dest,SpyMessage mes) throws 
JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The session is closed");  
         
  -                                                                             
  -             Log.log("Session: 
dispatchMessage(Destination="+dest.toString()+",Mes="+mes.toString()+")");
  -             
  -             if (mes.isOutdated()) return;
   
  -             //Get the SessionQueue for this Destination
  -             SessionQueue sessionQueue=(SessionQueue)destinations.get(dest);
  -             if (sessionQueue==null) return;
  -             
  -             //Work on the set of SpyTopicSubscriber for this topic          
  -             Iterator i=sessionQueue.subscribers.iterator();                        
 
  -             while (i.hasNext()) {   
  -                     SpyTopicSubscriber sub=(SpyTopicSubscriber)i.next();
  -                     sub.addMessage(mes);
  -             }
  -             
  -     }
        
            
        //called by a MessageProducer object which needs to publish a message
  
  
  
  1.6       +8 -7      spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java
  
  Index: SpyTopicSubscriber.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- SpyTopicSubscriber.java   2000/06/14 19:20:19     1.5
  +++ SpyTopicSubscriber.java   2000/11/19 19:59:57     1.6
  @@ -19,7 +19,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.5 $
  + *   @version $Revision: 1.6 $
    */
   public class SpyTopicSubscriber 
        extends SpyMessageConsumer 
  @@ -34,9 +34,10 @@
   
        // Constructor ---------------------------------------------------
           
  -    SpyTopicSubscriber(SpyTopicSession session,Topic topic,boolean local) 
  +     SpyTopicSubscriber(SpyTopicSession session,Topic topic,boolean local) 
        {
                super(session);
  +             destination=topic;
                this.topic=topic;
                this.local=local;
        }
  @@ -49,7 +50,7 @@
                return topic;
        }
   
  -    public boolean getNoLocal() throws JMSException
  +     public boolean getNoLocal() throws JMSException
        {
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");                   
                return local;
  @@ -74,7 +75,7 @@
                }
        }
                
  -    public Message receive() throws JMSException
  +     public Message receive() throws JMSException
        {
                super.receive();
                
  @@ -105,7 +106,7 @@
                }
        }
   
  -    public Message receive(long timeOut) throws JMSException
  +     public Message receive(long timeOut) throws JMSException
        {
                super.receive(timeOut);
                
  @@ -144,7 +145,7 @@
                
        }
   
  -    public Message receiveNoWait() throws JMSException
  +     public Message receiveNoWait() throws JMSException
        {
                super.receiveNoWait();
                
  @@ -158,7 +159,7 @@
                }
        }
   
  -    public void setMessageListener(MessageListener listener) throws JMSException
  +     public void setMessageListener(MessageListener listener) throws JMSException
        {       
                super.setMessageListener(listener);
                
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/JMSServerQueueReceiver.java
  
  Index: JMSServerQueueReceiver.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import java.util.HashMap;
  import java.util.Iterator;
  import java.io.Serializable;
  import org.spydermq.distributed.interfaces.ConnectionReceiver;
  import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
  
  /**
   * This class manages a connection receiver for a JMSServerQueue.
   * Keeps track of listening state and unacknowleged messages.
   *      
   *@author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *@version $Revision: 1.1 $
   */
  public class JMSServerQueueReceiver implements Serializable {
  
        // The queue messages will be comming from
        public JMSServerQueue jmsSeverQueue;
        // The connection mewssages will be going to
        public SpyDistributedConnection dc;
        // Used to know if the connection is accepting messages.
        public boolean listeners;
        public int receiveReuquests;
        // Keeps track of the unacknowledged messages send to the connection.
        public transient HashMap unacknowledgedMessages;
  
  
        
        // Consturctor ---------------------------------------------------
        public JMSServerQueueReceiver(JMSServerQueue serverQueue, 
SpyDistributedConnection sdc) {
                jmsSeverQueue = serverQueue;
                dc = sdc;
                listeners = false;
                receiveReuquests = 0;
                unacknowledgedMessages = new HashMap();
  
                Log.log("A ServerQueueReceiver has been created for : " + 
serverQueue.destination + "/" + dc.getClientID());
  
        }
  
        
        void acknowledge(String messageId, boolean ack) {
                SpyMessage m;
                synchronized (unacknowledgedMessages) {
                        m = (SpyMessage) unacknowledgedMessages.remove(messageId);
                }
  
                if (m == null)
                        return;
  
                if (!jmsSeverQueue.isTopic) {
                        // Not sure how we should handle the topic case.
                        // On a negative acknowledge, we don't want to
                        // add it back to the topic since other
                        // receivers might get a duplicate duplicate message.
                } else {
                        // Was it a negative acknowledge??
                        if (!ack) {
                                Log.log("Restoring message: " + m.messageId);
                                jmsSeverQueue.restoreMessage(m);
                        } else {
                                Log.log("Message Ack: " + m.messageId);
                        }
                }
  
        }
  
  
        // The connection is accepting new messages if there
        // is a listener or if the receiver has requested a message.    
        public boolean isListening() {
                return listeners || receiveReuquests > 0;
        }
  
        // Called when we send one message.
        public void removeReceiver() {
                if (receiveReuquests == 0)
                        return;
  
                receiveReuquests--;
                if (receiveReuquests == 0 && !listeners) {
                        jmsSeverQueue.listeners--;
                }
  
        }
  
        // This method gets invoked as a result of a receive() method call
        // on a QueueReceiver
        public void addReceiver(long wait) {
  
                // TODO: figure out a way to make a reciver eligable for
                // wait amount of time.
  
                receiveReuquests++;
                if (receiveReuquests == 1 && !listeners) {
                        jmsSeverQueue.listeners++;
                }
  
        }
  
        public void setListening(boolean value) {
                if (value == listeners)
                        return;
  
                listeners = value;
                if (value && receiveReuquests == 0)
                        jmsSeverQueue.listeners++;
  
                if (!value && receiveReuquests == 0)
                        jmsSeverQueue.listeners--;
        }
  
                        
        void sendMultipleMessages(SpyMessage mes[]) throws Exception {
                Log.log("DISPATCH: " + mes.length + " messages => " + 
dc.getClientID());
                if (!jmsSeverQueue.isTopic) { 
                        synchronized (unacknowledgedMessages) {
                                for (int i = 0; i < mes.length; i++) {
                                        
unacknowledgedMessages.put(mes[i].getJMSMessageID(), mes[i]);
                                }
                        }
                }
                dc.cr.receiveMultiple(jmsSeverQueue.destination, mes);
        }
  
        
        
        void sendOneMessage(SpyMessage mes) throws Exception {
                Log.log("DISPATCH: Message: " + mes + " => " + dc.getClientID());
                if (!jmsSeverQueue.isTopic) { 
                        synchronized (unacknowledgedMessages) {
                                unacknowledgedMessages.put(mes.getJMSMessageID(), mes);
                        }
                }
                removeReceiver();
                dc.cr.receive(jmsSeverQueue.destination, mes);
        }
  
  
        
        public void close() {
  
                Log.log("A ServerQueueReceiver has been closed: " + 
jmsSeverQueue.destination + "/" + dc.getClientID());
  
                if (isListening())
                        jmsSeverQueue.listeners--;
  
                synchronized (unacknowledgedMessages) {
                        Iterator iter = unacknowledgedMessages.values().iterator();
                        Log.log("Restoring " + unacknowledgedMessages.size() + " 
messages");
                        while (iter.hasNext()) {
                                SpyMessage m = (SpyMessage) iter.next();
                                jmsSeverQueue.restoreMessage(m);
                                iter.remove();
                        }
                }
        }
  
  
        
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyAcknowledgementItem.java
  
  Index: SpyAcknowledgementItem.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import java.io.Serializable;
  import javax.jms.Destination;
  
  import java.lang.Comparable;
  /**
   * Used to Acknowledge sent messages.
   *
   * This class holds the minimum abount of information needed to
   * identify a message to the JMSServer.
   *      
   * @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   * @version $Revision: 1.1 $
   */
  public class SpyAcknowledgementItem 
        implements java.io.Serializable
  {
  
        public Destination jmsDestination=null;
        public String jmsMessageID=null;
  
  
  
  }
  
  
  

Reply via email to