User: norbert 
  Date: 00/05/31 11:06:50

  Added:       src/java/org/spydermq ConnectionQueue.java JMSServer.java
                        JMSServerQueue.java Log.java
                        NoReceiverException.java SessionQueue.java
                        SpyBytesMessage.java SpyConnection.java
                        SpyConnectionMetaData.java SpyDestination.java
                        SpyDistributedConnection.java
                        SpyEncapsulatedMessage.java SpyMapMessage.java
                        SpyMessage.java SpyMessageConsumer.java
                        SpyMessageProducer.java SpyObjectMessage.java
                        SpyQueue.java SpyQueueBrowser.java
                        SpyQueueConnection.java SpyQueueReceiver.java
                        SpyQueueSender.java SpyQueueSession.java
                        SpySession.java SpyStreamMessage.java
                        SpyTemporaryQueue.java SpyTemporaryTopic.java
                        SpyTextMessage.java SpyTopic.java
                        SpyTopicConnection.java SpyTopicPublisher.java
                        SpyTopicSession.java SpyTopicSubscriber.java
  Log:
  Change the directory name
  
  Revision  Changes    Path
  1.1                  spyderMQ/src/java/org/spydermq/ConnectionQueue.java
  
  Index: ConnectionQueue.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.JMSException;
  import javax.jms.Destination;
  import java.util.HashSet;
  import java.util.Iterator;
  
  /**
   *    This class holds the subscribed sessions. 
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class ConnectionQueue
  {
        // Attributes ----------------------------------------------------
  
        //My destination
        Destination destination;
        //the SpySessions linked to this queue
        public HashSet subscribers;
        //Number of listening sessions 
        public int NumListeningSessions;
        //My SpyConnection
        SpyConnection connection;
  
        // Constructor ---------------------------------------------------
           
        ConnectionQueue(Destination destination,SpyConnection connection)
        {
                subscribers=new HashSet();
                this.connection=connection;
                this.destination=destination;
                NumListeningSessions=0;
        }
  
        // Package protected ---------------------------------------------
  
        synchronized void addSession(SpySession session)
        {
                HashSet newSet=(HashSet)subscribers.clone();
                newSet.add(session);
                subscribers=newSet;
        }
  
        synchronized boolean removeSession(SpySession session)
        {
                HashSet newSet=(HashSet)subscribers.clone();
                newSet.remove(session);
                subscribers=newSet;
                return subscribers.size()==0;
        }
  
        synchronized void changeNumListening(int val) throws JMSException
        {
                NumListeningSessions+=val;
  
                Log.log("ConnectionQueue: 
changeNumListening(sessions="+NumListeningSessions+")");
                
                try {           
                        
                        if (val==-1&&NumListeningSessions==0) {
                                
connection.provider.connectionListening(false,destination,connection.distributedConnection);
                        } else if (val==1&&NumListeningSessions==1) {
                                
connection.provider.connectionListening(true,destination,connection.distributedConnection);
                        }
                        
                } catch (Exception e) {
                        connection.failureHandler(e,"Cannot contact the JMS server");
                }
        
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/JMSServer.java
  
  Index: JMSServer.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.JMSException;
  import javax.jms.Destination;
  import javax.jms.TemporaryTopic;
  import javax.jms.TemporaryQueue;
  import javax.jms.Topic;
  import javax.jms.Queue;
  import java.util.LinkedList;
  import java.util.HashMap;
  import java.util.Iterator;
  import org.spydermq.security.SecurityManager;
  
  /**
   *    This class implements the JMS provider
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class JMSServer 
                implements Runnable 
  {
        
        // Constants -----------------------------------------------------
      
        //number of threads in the pool (TO DO: this value should be dynamic)
        final int NB_THREADS=1;
  
        // Attributes ----------------------------------------------------
     
        //messages pending for a Destination ( HashMap of JMSServerQueue objects )
        private HashMap messageQueue;
        //list of tasks pending ( linked list of JMSServerQueue objects )
        LinkedList taskQueue; //look when we unregister a temporaryTopic/Queue
        //last id given to a client 
        private int lastID;     
        //last id given to a temporary topic
        private int lastTemporaryTopic; 
        //last id given to a temporary queue
        private int lastTemporaryQueue; 
        //The security manager
        SecurityManager securityManager;
  
        // Constructor ---------------------------------------------------
     
        public JMSServer(SecurityManager securityManager)
        {
  
                taskQueue=new LinkedList();
                messageQueue=new HashMap();
                
                for(int i=0;i<NB_THREADS;i++) 
                {
                        Thread oneThread=new Thread(this);
                        oneThread.setDaemon(true);
                        oneThread.setName(new Integer(i).toString());
                        oneThread.start();
                }
                
                lastID=1;
                lastTemporaryTopic=1;
                this.securityManager=securityManager;
                
      }
  
        // Public --------------------------------------------------------
  
        //This is a correct threading system, but this is not ideal... 
        //We should let threads cycle through the JMSServerQueue list, and 
synchronized on the queue they are working on.
        
        public void run()
        {
                while (true)
                {
        
                        JMSServerQueue queue=null;                      
                
                        //Wait (and sleep) until it can find something to do
                        synchronized (taskQueue)
                        {
                                while (queue==null) {                                  
 
                                        
                                        // size() is O(1) in LinkedList... 
                                        int size=taskQueue.size(); 
                                        if (size!=0) { 
                                                
queue=(JMSServerQueue)taskQueue.removeFirst();
                                                //One other thread can start working 
on the task queue...
                                                if (size>1) taskQueue.notify();
                                        } else {        
                                                try {
                                                        Log.log("I'm going to bed...");
                                                        taskQueue.wait();
                                                        Log.log("I wake up");
                                                } catch (InterruptedException e) {
                                                }
                                        }
                                        
                                }
                        }
  
                        //Ask the queue to do its job
                        queue.doMyJob();
                        
                }
                        
        }
        
        // Administration calls
        
        public SpyTopic newTopic(String name) throws JMSException
        {
                Log.notice("[JMSServer] new topic : "+name);
        
                SpyTopic newTopic=new SpyTopic(name);
                if (messageQueue.containsKey(newTopic)) throw new JMSException("This 
topic already exists !");
                
                JMSServerQueue queue=new JMSServerQueue(newTopic,null,this);
                
                //Add this new JMSServerQueue to the list
                synchronized (messageQueue) {
                        HashMap newMap=(HashMap)messageQueue.clone();
                        newMap.put(newTopic,queue);     
                        messageQueue=newMap;
                }
        
                return newTopic;
        }
  
                
        public SpyQueue newQueue(String name) throws JMSException
        {
                Log.notice("[JMSServer] new queue : "+name);
        
                SpyQueue newQueue=new SpyQueue(name);
                if (messageQueue.containsKey(newQueue)) throw new JMSException("This 
queue already exists !");
                
                JMSServerQueue queue=new JMSServerQueue(newQueue,null,this);
                
                //Add this new JMSServerQueue to the list
                synchronized (messageQueue) {
                        HashMap newMap=(HashMap)messageQueue.clone();
                        newMap.put(newQueue,queue);     
                        messageQueue=newMap;
                }
        
                return newQueue;
        }
  
        // -----------------------------------------
        // Callbacks for the invocation layer ------
        // -----------------------------------------
        
        //Get a new ClientID for a connection
        public String getID()
        {
                String ID=null;
                
                while (true) {
                        try {
                                ID="ID"+(new Integer(lastID++).toString());
                                securityManager.addClientID(ID);
                                break;
                        } catch (Exception e) {
                        }
                }
                
                return ID;
        }
        
        //A connection has send a new message
      public void newMessage(SpyMessage val[],String id) throws JMSException 
        {
                if (val.length!=1) Log.notice("INCOMING: "+val.length+" messages from 
"+id);
                
                for(int i=0;i<val.length;i++) {
                
                        Log.notice("INCOMING: "+val[i]+" => "+val[i].jmsDestination);
                
                        JMSServerQueue 
queue=(JMSServerQueue)messageQueue.get(val[i].jmsDestination);
                        if (queue==null) throw new JMSException("This destination does 
not exist !");
                
                        //Add the message to the queue
                        queue.addMessage(val[i]);
                }
      }
        
        //A connection object wants to subscribe to a Destination
        public void subscribe(Destination dest,SpyDistributedConnection dc) throws 
JMSException
        {
                Log.log("Server: 
subscribe(dest="+dest.toString()+",idConnection="+dc.getClientID()+")");
                
                JMSServerQueue queue=(JMSServerQueue)messageQueue.get(dest);
                if (queue==null) throw new JMSException("This destination does not 
exist !");
                queue.addSubscriber(dc);
        }       
        
        public void unsubscribe(Destination dest,SpyDistributedConnection dc) throws 
JMSException
        {
                Log.log("Server: 
unsubscribe(dest="+dest.toString()+",idConnection="+dc.getClientID()+")");
                                
                JMSServerQueue queue=(JMSServerQueue)messageQueue.get(dest);
                if (queue==null) throw new JMSException("This destination does not 
exist !");
                queue.removeSubscriber(dc);
        }
        
        public synchronized SpyTopic createTopic(String name) throws JMSException
        {
                Log.log("createTopic("+name+")");
  
                SpyTopic newTopic=new SpyTopic(name);
                if (!messageQueue.containsKey(newTopic)) throw new JMSException("This 
destination does not exist !");
                return newTopic;                
        }
  
        public synchronized SpyQueue createQueue(String name) throws JMSException
        {
                Log.log("createQueue("+name+")");
  
                SpyQueue newQueue=new SpyQueue(name);
                if (!messageQueue.containsKey(newQueue)) throw new JMSException("This 
destination does not exist !");
                return newQueue;
        }
  
        public synchronized TemporaryTopic getTemporaryTopic(SpyDistributedConnection 
dc)
        {
                SpyTemporaryTopic topic=new SpyTemporaryTopic("JMS_TT"+(new 
Integer(lastTemporaryTopic++).toString()),dc);
  
                synchronized (messageQueue) {
                        JMSServerQueue queue=new JMSServerQueue(topic,dc,this);
                        HashMap newMap=(HashMap)messageQueue.clone();
                        newMap.put(topic,queue); 
                        messageQueue=newMap;
                }
                
                return topic;
        }
        
        public synchronized TemporaryQueue getTemporaryQueue(SpyDistributedConnection 
dc)
        {
                SpyTemporaryQueue newQueue=new SpyTemporaryQueue("JMS_TQ"+(new 
Integer(lastTemporaryQueue++).toString()),dc);
  
                synchronized (messageQueue) {
                        JMSServerQueue sessionQueue=new 
JMSServerQueue(newQueue,dc,this);
                        HashMap newMap=(HashMap)messageQueue.clone();
                        newMap.put(newQueue,sessionQueue); 
                        messageQueue=newMap;
                }
                
                return newQueue;
        }
  
        //A connection is closing [error or notification]
        public synchronized void connectionClosing(SpyDistributedConnection 
dc,JMSServerQueue noCheck)
        {
                if (dc==null) return;
                
                //unregister its clientID
                if (dc.getClientID()!=null)
                        securityManager.removeID(dc.getClientID());
                
                //Remove the connection from the subscribers list
                synchronized (messageQueue) {
  
                        HashMap newMap=null;
                        Iterator i=messageQueue.values().iterator();
                        boolean modified=false; //don't waste memory
        
                        while (i.hasNext()) {
                                
                                JMSServerQueue sq=(JMSServerQueue)i.next();
                                                                
                                if (dc.equals(sq.temporaryDestination)) {
                                        if (!modified) 
newMap=(HashMap)messageQueue.clone();
                                        newMap.remove(sq.destination);
                                        modified=true;
                                } else {
                                        if (sq==noCheck) continue;
                                        sq.connectionClosing(dc);
                                }
                                
                        }
                
                        if (modified) messageQueue=newMap;
                }
                
        }
        
        public synchronized void deleteTemporaryDestination(SpyDestination dest)
        {
                Log.log("JMSServer: deleteDestination(dest="+dest.toString()+")");
  
                synchronized (messageQueue) {
                        HashMap newMap=(HashMap)messageQueue.clone();   
                        newMap.remove(dest);
                        messageQueue=newMap;
                }
                
        }
                        
        public void checkID(String ID) throws JMSException
        {
                securityManager.addClientID(ID);
        }
        
        public SpyMessage queueReceiveNoWait(Queue queue) throws JMSException
        {
                Log.log("JMSserver: queueReceiveNoWait(queue="+queue+")");
                
                JMSServerQueue serverQueue=(JMSServerQueue)messageQueue.get(queue);
                if (serverQueue==null) throw new JMSException("This destination does 
not exist !");
                
                return serverQueue.queueReceiveNoWait();
        }
        
        public void connectionListening(boolean mode,Destination 
dest,SpyDistributedConnection dc) throws JMSException
        {
                JMSServerQueue serverQueue=(JMSServerQueue)messageQueue.get(dest);
                if (serverQueue==null) throw new JMSException("This destination does 
not exist !");
                
                serverQueue.connectionListening(mode,dc);
        }
  
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/JMSServerQueue.java
  
  Index: JMSServerQueue.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.Destination;
  import javax.jms.JMSException;
  import java.util.Iterator;
  import java.util.Hashtable;
  import java.util.LinkedList;
  import java.util.HashMap;
  
  /**
   *    This class is a message queue which is stored (hashed by Destination) on the 
JMS provider
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class JMSServerQueue
  {
        // Attributes ----------------------------------------------------
  
        //the Destination of this queue
        Destination destination;
        //DistributedConnection objs that have "registered" to this Destination
        private HashMap subscribers;
        //List of Pending messages
        private LinkedList messages;
        //Is a thread already working on this queue ? 
        //You cannot start two threads on the same destination (correct order of msgs)
        private boolean threadWorking;
        // Am I already in the task queue ? It is useless to put many times the same 
destination in the task queue.
        private boolean alreadyInTaskQueue;
        //If this is linked to a temporaryDestination, 
temporaryDestination=DistributedConnection of the owner, otherwise it's null
        SpyDistributedConnection temporaryDestination;
        //The JMSServer object
        private JMSServer server;
        //Am I a queue or a topic  
        boolean isTopic;
        //List of messages waiting for acknowledgment
        private LinkedList messagesWaitingForAck;
        //Nb of listeners for this Queue
        int listeners;
        
        // Constructor ---------------------------------------------------
           
        JMSServerQueue(Destination dest,SpyDistributedConnection temporary,JMSServer 
server)
        {
                destination=dest;
                subscribers=new HashMap();
                messages=new LinkedList();
                threadWorking=false;
                alreadyInTaskQueue=false;
                temporaryDestination=temporary;
                this.server=server;
                messagesWaitingForAck=new LinkedList();
                isTopic=dest instanceof SpyTopic;
                listeners=0;
        }
        
        // Package protected ---------------------------------------------
            
        void addSubscriber(SpyDistributedConnection dc) throws JMSException
        {
                //We want to avoid removeSubscriber, addSubscriber or sendOneMessage 
to work concurently
                synchronized (destination) {
                        if 
(temporaryDestination!=null&&!temporaryDestination.equals(dc)) throw new 
JMSException("You cannot subscriber to this temporary destination");
                        subscribers.put(dc.getClientID(),dc);
                }
        }
  
        void removeSubscriber(SpyDistributedConnection dc)
        {
                //We want to avoid removeSubscriber, addSubscriber or sendOneMessage 
to work concurently
                synchronized (destination) {
                        
                        SpyDistributedConnection 
distributedConnection=(SpyDistributedConnection)subscribers.get(dc.getClientID());
                        if (distributedConnection==null) return;
                        listeners-=distributedConnection.listeners;
                        
                        subscribers.remove(dc);
                                                                        
                }
        }
  
        void addMessage(SpyMessage mes) throws JMSException
        {
                //Add a message to the message list... 
                synchronized (messages) 
                {                       
                        //Add the message to the queue
  
                        //Get the priority
                        int pri=mes.getJMSPriority();                   
                        
                        if (pri<=4) {                           
                                //normal priority message
                                messages.addLast(mes);                          
                        } else {                                
                                //expedited priority message
                                int size=messages.size();
                                int i=0;                                
                                for(;i<size;i++) {
                                        if 
(((SpyMessage)messages.get(i)).getJMSPriority()<pri) break;
                                }                               
                                messages.add(i,mes);
                        }
                        
                        if (isTopic) {
                                //if a thread is already working on this destination, 
I don't have to myself to the taskqueue
                                if (!threadWorking) notifyWorkers();
                        } else {
                                if (listeners!=0&&!threadWorking) notifyWorkers();
                        }
  
                }
        }
  
        //Clear the message queue
        synchronized SpyMessage[] startWork()
        {
                if (threadWorking) throw new RuntimeException("One thread is already 
working !");
                
                synchronized (messages) {
                        SpyMessage[] mes=new SpyMessage[messages.size()];
                        mes=(SpyMessage[])messages.toArray(mes);
                        messages.clear();
                        threadWorking=true;
                        alreadyInTaskQueue=false;
                        return mes;
                }
        }
        
        synchronized SpyMessage startWorkQueue()
        {
                synchronized (messages) {
                        
                        threadWorking=true;
                        alreadyInTaskQueue=false;
  
                        if (messages.size()==0) return null;
                        return (SpyMessage)messages.removeFirst();
                }
        }
        
        void endWork()
        {
                //The thread has finished his work...
                threadWorking=false;            
                
                synchronized (messages) {
                        if (isTopic) {
                                //notify another thread if there is work to do !
                                if (!messages.isEmpty()) notifyWorkers();
                        } else {
                                if (listeners!=0&&!messages.isEmpty()) notifyWorkers();
                        }
                }
        }
  
        void sendOneMessage(SpyMessage mes)
        {
                //we can only add/remove a subscribers once the message is sent ( 
iterator is fail-fast )
                synchronized (subscribers) {
                        if (subscribers.isEmpty()) return;
                                
                        Iterator i=subscribers.values().iterator();
                                
                        while (i.hasNext()) {
                                SpyDistributedConnection 
dc=(SpyDistributedConnection)i.next();
                                try {
                                        dc.cr.receive(destination,mes);
                                } catch (Exception e) {
                                        Log.error("Cannot deliver this message to the 
client "+dc);                                     
                                        Log.error(e);
                                        handleConnectionFailure(dc,i);
                                } 
                        }       
                }
        }
  
        void sendMultipleMessages(SpyMessage mes[])
        {
                synchronized (subscribers) {
                        if (subscribers.isEmpty()) return;
                                
                        Iterator i=subscribers.values().iterator();
                                
                        while (i.hasNext()) {
                                SpyDistributedConnection 
dc=(SpyDistributedConnection)i.next();
                                try {
                                        dc.cr.receiveMultiple(destination,mes);
                                } catch (Exception e) {
                                        Log.error("Cannot deliver those messages to 
the client "+dc);                                   
                                        Log.error(e);
                                        handleConnectionFailure(dc,i);
                                } 
                        }       
                }
        }
        
        //A connection is closing
        void connectionClosing(SpyDistributedConnection dc)
        {
                if (!subscribers.containsKey(dc.getClientID())) return;
                Log.notice("Warning: The DistributedConnection was still registered 
for "+destination);
                removeSubscriber(dc);
        }
        
        void notifyWorkers()
        {
                //It is useless to put many times the same destination in the task 
queue
                if (alreadyInTaskQueue) return;
                
                synchronized (server.taskQueue) {
                        alreadyInTaskQueue=true;
                        server.taskQueue.addLast(this);
                        server.taskQueue.notify();
                }
        }
        
        private void handleConnectionFailure(SpyDistributedConnection dc,Iterator i)
        {
                //We should try again :) This behavior should under control of a 
Failure-Plugin         
                Log.error("I remove this Connection from the subscribers list");
                
                //Call JMSServer.ConnectionClosing(), but ask him not to check my list.
                server.connectionClosing(dc,this);
                
                //remove this connection from the list
                if (i!=null) i.remove();
                else subscribers.remove(dc.getClientID());
        }
        
        void doMyJob()
        {                       
                if (isTopic) {                  
                        
                        //Clear the message queue
                        SpyMessage[] msgs=startWork();
                                
                        //Let the thread do its work
                        if (msgs.length>1) {
                                //We can send multiple messages
                                Log.log("DISPATCH: "+msgs.length+" messages => 
"+destination);
                                sendMultipleMessages(msgs);
                        } else {
                                //Send each message
                                for(int i=0;i<msgs.length;i++) {
                                        SpyMessage message=(SpyMessage)msgs[i];
                                        Log.log("DISPATCH: "+message+" => 
"+destination);
                                        if (!message.isOutdated()) 
sendOneMessage(message);
                                }                       
                        }
                                
                        //Notify that it has finished its work : another thread can 
start working on this queue
                        endWork();
                        
                } else {
                        
                        while (true) {
                                
                                //Get a receiver
                                //NL: We could find a better receiver (load balancing 
?)
                                
                                if (listeners==0) break;                               
 
                                Iterator i=subscribers.values().iterator();
                                SpyDistributedConnection dc=null;
                                while (i.hasNext()) {
                                        dc=(SpyDistributedConnection)i.next();         
                         
                                        if (dc.listeners!=0) break;
                                }
                                if (dc==null||dc.listeners==0) {
                                        listeners=0;
                                        Log.error("WARNING: The listeners count was 
invalid !");
                                        break;
                                }
                                
                                //Get the message
                                SpyMessage mes=startWorkQueue();
                                if (mes==null) break;
                                if (mes.isOutdated()) continue;
                                
                                //Send the message
                                try {
                                        dc.cr.receive(destination,mes);
                                } catch (Exception e) {
                                        Log.error("Cannot deliver this message to the 
client "+dc);                                     
                                        Log.error(e);
                                        handleConnectionFailure(dc,null);
                                } 
                                
                        }
                        
                        //Notify that it has finished its work : another thread can 
start working on this queue
                        endWork();
                                                
                }
        }
        
        SpyMessage queueReceiveNoWait()
        {
                synchronized (messages) {
                        if (messages.size()==0) return null;
                        return (SpyMessage)messages.removeFirst();
                }
        }
                
        void connectionListening(boolean mode,SpyDistributedConnection dc) throws 
JMSException 
        {
                SpyDistributedConnection 
distributedConnection=(SpyDistributedConnection)subscribers.get(dc.getClientID());
                if (distributedConnection==null) throw new JMSException("This 
DistributedConnection is not registered");
                
                if (mode) {
                        distributedConnection.listeners++;
                        listeners++;
  
                        if (listeners==1&&!threadWorking)
                                synchronized (messages) {
                                        if (!messages.isEmpty()) notifyWorkers();
                                }
                
                } else {
                        distributedConnection.listeners--;
                        listeners--;
                }
  
                Log.log("Listeners for "+destination+" = "+listeners);
                
        }
        
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/Log.java
  
  Index: Log.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  /**
   * This is a very basic log system,
   * the only functionnality that we need is to be able to shut down the log. 
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class Log
  {
        final static int LOG_EVERYTHING = 1;
        final static int LOG_NOTICE             = 2;
        final static int LOG_ERRORS             = 3;
        
        //Change this line change the verbosity level
        final static int logType = LOG_EVERYTHING;
        
        private static void print(Object obj)
        {
                if (obj instanceof String) System.out.println((String)obj);
                else if (obj instanceof Exception) ((Exception)obj).printStackTrace();
                else System.out.println(obj.toString());
        }
        
        //Logs
        
        public static void log(Object obj)
        {
                if (logType>LOG_EVERYTHING) return;
                print(obj);
        }
  
        //Notice
        
        public static void notice(Object obj)
        {
                if (logType>LOG_NOTICE) return;
                print(obj);
        }
        
        //Errors
        
        public static void error(Object obj)
        {
                print(obj);
        }
        
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/NoReceiverException.java
  
  Index: NoReceiverException.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  /**
   * This is a very basic log system,
   * the only functionnality that we need is to be able to shut down the log. 
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  
  import javax.jms.JMSException;
  
  public class NoReceiverException
        extends JMSException 
  {
        public NoReceiverException(String reason, String errorCode) 
   {
                super(reason, errorCode);
        }
  
        public NoReceiverException(String reason) {
                super(reason);
        }
  
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SessionQueue.java
  
  Index: SessionQueue.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.MessageListener;
  import javax.jms.MessageConsumer;
  import javax.jms.JMSException;
  import javax.jms.Session;
  import javax.jms.Destination;
  import java.util.LinkedList;
  import java.util.HashSet;
  import java.util.Iterator;
  import org.spydermq.selectors.Selector;
  
  /**
   *    This class is a message queue which is stored (hashed by Destination) in the 
SpySession object
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SessionQueue
  {
        // Attributes ----------------------------------------------------
  
        //My destination
        Destination destination;
        //List of messages waiting for acknoledgment
        LinkedList messagesWaitingForAck;
        //the MessageConsumers linked to this queue
        public HashSet subscribers;
        //Number of listening receivers 
        public int NumListeningSubscribers;
        //My SpySession
        SpySession session;
  
        // Constructor ---------------------------------------------------
           
        SessionQueue(SpySession session,Destination destination)
        {
                messagesWaitingForAck=new LinkedList();
                subscribers=new HashSet();
                this.session=session;
                this.destination=destination;
                NumListeningSubscribers=0;
        }
  
        // Package protected ---------------------------------------------
            
        
        //Send a message from the queue to a MessageConsumer
        boolean deliverMessage()
        {
                boolean result=false;
                Iterator i=subscribers.iterator();
                
                while (i.hasNext()) {
                        
                        SpyMessageConsumer consumer=(SpyMessageConsumer)i.next();
  
                        synchronized (consumer.messages) {
                                
                                if (consumer.messages.size()==0) continue;
                                        
                                if (consumer.messageListener==null) {
                                        if (!consumer.waitInReceive) continue;
                                        consumer.messages.notify();
                                } else {
                                        SpyMessage mes=consumer.getMessage();
                                        if (mes==null) return false;
                                        consumer.messageListener.onMessage(mes);
                                }
                                
                                result=true;                                    
                                                                        
                        }
                
                }
                
                return result;
        }
        
        //A message has been acknowledged
        void acknowledge(SpyMessage mes)
        {
                Log.log("SessionQueue: acknowledge("+mes.toString()+")");
                
                synchronized (messagesWaitingForAck) {
                        
                        int pos=messagesWaitingForAck.indexOf(mes);
                        
                        if (pos==-1) return;
                        
                        for(int i=0;i<=pos;i++)
                                messagesWaitingForAck.removeFirst();
                        
                }                               
                
        }
        
        //notify the sleeping synchronous listeners
        void close() throws JMSException 
        {
                Iterator i=subscribers.iterator();
                
                while (i.hasNext()) {                   
                        SpyMessageConsumer consumer=(SpyMessageConsumer)i.next();
                        consumer.close();
                }
                
        }
  
        //the session is about to recover
        void recover() throws JMSException
        {
                synchronized (messagesWaitingForAck) {
                                
                        while (messagesWaitingForAck.size()!=0) {
                                        
                                //Get the most recent unacknowledged message
                                SpyMessage 
mes=(SpyMessage)messagesWaitingForAck.removeLast();
                                        
                                //This message is redelivered                          
         
                                mes.setJMSRedelivered(true);
                                        
                                //Put the message in one incoming queue - Is it what 
the spec says ?
                                Iterator i=subscribers.iterator();
                
                                if (i.hasNext()) {                      
                                        SpyMessageConsumer 
consumer=(SpyMessageConsumer)i.next();
                                        consumer.addMessage(mes);
                                }
                                        
                        }
                                
                }
        }
  
        //the session is about to commit, we have to clear our messagesWaitForAck queue
        void commit()
        {
                synchronized (messagesWaitingForAck) {
                        messagesWaitingForAck.clear();
                }
        }       
        
        synchronized void addConsumer(SpyMessageConsumer consumer)
        {
                consumer.setSessionQueue(this);
                HashSet newSet=(HashSet)subscribers.clone();
                newSet.add(consumer);
                subscribers=newSet;
        }
  
        synchronized boolean removeConsumer(MessageConsumer consumer)
        {
                HashSet newSet=(HashSet)subscribers.clone();
                newSet.remove(consumer);
                subscribers=newSet;
                return subscribers.size()==0;
        }
  
        synchronized void changeNumListening(int val) throws JMSException
        {
                NumListeningSubscribers+=val;
                
                if 
((val==-1&&NumListeningSubscribers==0)||(val==1&&NumListeningSubscribers==1)) {
                        ConnectionQueue 
connectionQueue=(ConnectionQueue)session.connection.destinations.get(destination);
                        if (connectionQueue==null) throw new JMSException("There is NO 
ConnectionQueue for this Destination in the SpyConnection !");
                        connectionQueue.changeNumListening(val);
                }
        
        }
                
        public void dispatchMessage(Destination dest, SpyMessage mes) throws 
JMSException
        {       
                Log.log("SessionQueue: 
dispatchMessage(Destination="+dest.toString()+",Mes="+mes.toString()+")");
                
                if (session.closed) throw new IllegalStateException("The session is 
closed");
                if (NumListeningSubscribers==0) throw new NoReceiverException("There 
are no receivers for this destination !");
                if (mes.isOutdated()) return;
                
                Iterator i=subscribers.iterator();              
                SpyQueueReceiver receiver=null;
                while (i.hasNext()) {
                        receiver=(SpyQueueReceiver)i.next();
                        if (receiver.listening) break;
                }
                if (receiver==null||!receiver.listening) {
                        NumListeningSubscribers=0;
                        Log.error("WARNING: The listeners count was invalid !");
                        throw new NoReceiverException("There are no receivers for this 
destination !");
                }
  
                synchronized (receiver.messages) {
                                
                        if (receiver.messageListener==null) {
                                if (!receiver.waitInReceive) throw new 
NoReceiverException("The receiver is not waiting for a message !"); //Try someone else 
in the same session
                                receiver.addMessage(mes);
                                receiver.messages.notify();
                        } else {
                                receiver.addMessage(mes);                              
 
                                receiver.messageListener.onMessage(mes);
                        }
                                                                                       
                 
                }
  
                
        }                                                                       
                
  
  
        
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyBytesMessage.java
  
  Index: SpyBytesMessage.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.BytesMessage;
  import javax.jms.JMSException;
  import javax.jms.MessageFormatException;
  import javax.jms.MessageNotWriteableException;
  import java.io.IOException;
  import java.io.ByteArrayOutputStream;
  import java.io.DataOutputStream;
  import java.io.ByteArrayInputStream;
  import java.io.DataInputStream;
  
  /**
   *    This class implements javax.jms.BytesMessage
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyBytesMessage 
                extends SpyMessage 
                implements Cloneable, BytesMessage
  {
        
        // Attributes ----------------------------------------------------
  
        private ByteArrayOutputStream ostream=null;
        private DataOutputStream p=null;
        private byte[] InternalArray=null;      
        private ByteArrayInputStream istream=null;
        private DataInputStream m=null;
  
        // Constructor ---------------------------------------------------
           
        SpyBytesMessage()
        {
                msgReadOnly=false;
                ostream = new ByteArrayOutputStream();
                p = new DataOutputStream(ostream);
        }
  
        // Public --------------------------------------------------------
  
      public boolean readBoolean() throws JMSException 
        {
                checkRead();
                try {           
                        return m.readBoolean();
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
  
      public byte readByte() throws JMSException 
        {
                checkRead();    
                try {           
                        return m.readByte();
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
  
      public int readUnsignedByte() throws JMSException 
        {
                checkRead();
                try {           
                        return m.readUnsignedByte();
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
  
      public short readShort() throws JMSException 
        {
                checkRead();
                try {           
                        return m.readShort();
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
   
      public int readUnsignedShort() throws JMSException 
        {
                checkRead();
                try {           
                        return m.readUnsignedShort();
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
  
      public char readChar() throws JMSException 
        {
                checkRead();
                try {           
                        return m.readChar();
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
  
      public int readInt() throws JMSException 
        {
                checkRead();
                try {           
                        return m.readInt();
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
  
      public long readLong() throws JMSException 
        {
                checkRead();
                try {           
                        return m.readLong();
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
  
      public float readFloat() throws JMSException 
        {
                checkRead();
                try {           
                        return m.readFloat();
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
      
      public double readDouble() throws JMSException 
        {
                checkRead();
                try {           
                        return m.readDouble();
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
  
      public String readUTF() throws JMSException 
        {
                checkRead();
                try {           
                        return m.readUTF();
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
      
      public int readBytes(byte[] value) throws JMSException 
        {
                checkRead();
                try {           
                        return m.read(value);
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
  
      public int readBytes(byte[] value, int length) throws JMSException 
        {
                checkRead();
                try {           
                        return m.read(value,0,length);
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
  
      public void writeBoolean(boolean value) throws JMSException 
        {
                if (msgReadOnly) throw new MessageNotWriteableException("the message 
body is read-only");
                try {           
                        p.writeBoolean(value);
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
  
      public void writeByte(byte value) throws JMSException 
        {
                if (msgReadOnly) throw new MessageNotWriteableException("the message 
body is read-only");
                try {           
                        p.writeByte(value);
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
  
      public void writeShort(short value) throws JMSException 
        {
                if (msgReadOnly) throw new MessageNotWriteableException("the message 
body is read-only");
                try {           
                        p.writeShort(value);
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
      
      public void writeChar(char value) throws JMSException 
        {
                if (msgReadOnly) throw new MessageNotWriteableException("the message 
body is read-only");
                try {           
                        p.writeChar(value);
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
        
      public void writeInt(int value) throws JMSException 
        {
                if (msgReadOnly) throw new MessageNotWriteableException("the message 
body is read-only");
                try {           
                        p.writeInt(value);
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
  
      public void writeLong(long value) throws JMSException 
        {
                if (msgReadOnly) throw new MessageNotWriteableException("the message 
body is read-only");
                try {           
                        p.writeLong(value);
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
  
      public void writeFloat(float value) throws JMSException 
        {
                if (msgReadOnly) throw new MessageNotWriteableException("the message 
body is read-only");
                try {           
                        p.writeFloat(value);
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
  
      public void writeDouble(double value) throws JMSException 
        {
                if (msgReadOnly) throw new MessageNotWriteableException("the message 
body is read-only");
                try {           
                        p.writeDouble(value);
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
  
      public void writeUTF(String value) throws JMSException 
        {
                if (msgReadOnly) throw new MessageNotWriteableException("the message 
body is read-only");
                try {           
                        p.writeUTF(value);
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
  
      public void writeBytes(byte[] value) throws JMSException 
        {
                if (msgReadOnly) throw new MessageNotWriteableException("the message 
body is read-only");
                try {           
                        p.write(value,0,value.length);
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
   
      public void writeBytes(byte[] value, int offset, int length) throws JMSException 
        {
                if (msgReadOnly) throw new MessageNotWriteableException("the message 
body is read-only");
                try {           
                        p.write(value,offset,length);
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
        }
        
      public void writeObject(Object value) throws JMSException 
        {
                if (msgReadOnly) throw new MessageNotWriteableException("the message 
body is read-only");
                try {           
                        if (value instanceof String) p.writeChars((String)value);
                        else if (value instanceof Boolean) 
p.writeBoolean(((Boolean)value).booleanValue());
                        else if (value instanceof Byte) 
p.writeByte(((Byte)value).byteValue());
                        else if (value instanceof Short) 
p.writeShort(((Short)value).shortValue());
                        else if (value instanceof Integer) 
p.writeInt(((Integer)value).intValue());
                        else if (value instanceof Long) 
p.writeLong(((Long)value).longValue());
                        else if (value instanceof Float) 
p.writeFloat(((Float)value).floatValue());
                        else if (value instanceof Double) 
p.writeDouble(((Double)value).doubleValue());
                        else if (value instanceof byte[]) 
p.write((byte[])value,0,((byte[])value).length);
                        else throw new MessageFormatException("Invalid object for 
properties"); 
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }                               
                
        } 
  
      public void reset() throws JMSException 
        {
                try {                           
                        if (!msgReadOnly) {
                                p.flush();
                                InternalArray=ostream.toByteArray();
                                ostream.close();
                        }
                        ostream=null;
                        istream=null;
                        m=null;
                        p=null;
                        msgReadOnly = true;
                } catch (IOException e) {
                        throw new JMSException("IOException");
                }               
        }
        
      public void clearBody() throws JMSException
        {
                try {   
                        if (!msgReadOnly) ostream.close();
                        else istream.close();
                } catch (IOException e) {
                        //don't throw an exception
                }               
                
                ostream=new ByteArrayOutputStream();
                p=new DataOutputStream(ostream);
                InternalArray=null;     
                istream=null;
                m=null;
                
                super.clearBody();
        }
  
        // Package protected ---------------------------------------------  
        
        //We need to reset() since this message is going to be cloned/serialized
        SpyMessage myClone()
        {
                try {
                        reset();                        
                        return (SpyMessage)clone();
                } catch (Exception e) {                 
                        throw new RuntimeException("myClone failed !");
                } 
        }
            
        // Private -------------------------------------------------------
  
        private void checkRead() throws JMSException 
        {
                if (!msgReadOnly) throw new MessageNotWriteableException("readByte 
while the buffer is writeonly");
                
                //We have just received/reset() the message, and the client is trying 
to read it
                if (istream==null||m==null) {
                        istream = new ByteArrayInputStream(InternalArray);
                        m = new DataInputStream(istream);       
                }
        }
        
        
        
  
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyConnection.java
  
  Index: SpyConnection.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.Connection;
  import javax.jms.Destination;
  import javax.jms.Topic;
  import javax.jms.Queue;
  import javax.jms.JMSException;
  import javax.jms.ConnectionMetaData;
  import javax.jms.ExceptionListener;
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.Collection;
  import java.util.Properties;
  import java.util.Iterator;
  import java.io.Serializable;
  import java.io.FileInputStream;
  import org.spydermq.distributed.interfaces.DistributedJMSServer;
  import org.spydermq.distributed.interfaces.ConnectionReceiver;
  import org.spydermq.distributed.ConnectionReceiverFactory;
  
  /**
   *    This class implements javax.jms.Connection
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyConnection 
                implements Connection, Serializable
  {
  
        // Attributes ----------------------------------------------------
  
        //This is our connection to the JMS server
        protected DistributedJMSServer provider;
        //This is the clientID
        protected String clientID;
        //the distributed object which receives messages from the JMS server
        protected SpyDistributedConnection distributedConnection;
        //HashMap of ConnectionQueue by Destination
        public HashMap destinations;
        //LinkedList of all created sessions by this connection 
        HashSet createdSessions;
        //Last message ID returned
        private int lastMessageID;
        //Is the connection stopped ?
        public boolean modeStop;
        //Is the connection closed ?
        boolean closed;
        //Name of the connectionReceiver class
        String crClassName;     
        //the exceptionListener
        private ExceptionListener exceptionListener;
  
        // Constructor ---------------------------------------------------
           
        SpyConnection(DistributedJMSServer theServer,String cID,String crCN) throws 
JMSException
        {
                //Set the attributes
                provider = theServer;
                destinations=new HashMap();
                createdSessions=new HashSet();
                distributedConnection=null;
                closed=false;
                lastMessageID=0;
                modeStop=true;
                clientID=cID;
                crClassName=crCN;
        }
  
        // Public --------------------------------------------------------
  
        public String getClientID() throws JMSException
        {
                if (closed) throw new IllegalStateException("The connection is 
closed");
                return clientID;
        }
  
  
      public void setClientID(String cID) throws JMSException
      {
            if (closed) throw new IllegalStateException("The connection is closed");
                if (clientID!=null) throw new IllegalStateException("The connection 
has already a clientID");
  
                Log.log("SetClientID("+clientID+")");
  
                try {           
                        provider.checkID(cID);
                } catch (JMSException e) {
                        throw e;
                } catch (Exception e) {
                        failureHandler(e,"Cannot connect to the JMSServer");
                }
                        
                clientID=cID;
        }
   
        public ConnectionMetaData getMetaData() throws JMSException
        {
                if (closed) throw new IllegalStateException("The connection is 
closed");                
                
                return new SpyConnectionMetaData();
        }
  
        public ExceptionListener getExceptionListener() throws JMSException
        {
                if (closed) throw new IllegalStateException("The connection is 
closed");
                if (distributedConnection==null) createReceiver();
                
                return exceptionListener;
        }
  
        public void setExceptionListener(ExceptionListener listener) throws 
JMSException
        {
                if (closed) throw new IllegalStateException("The connection is 
closed");
                if (distributedConnection==null) createReceiver();
                
                exceptionListener=listener;
        }
  
        public void start() throws JMSException
        {
                if (closed) throw new IllegalStateException("The connection is 
closed");
                if (distributedConnection==null) createReceiver();
                        
                if (!modeStop) return;
                modeStop=false;
  
                changeModeStop(modeStop);
        }
  
      public void stop() throws JMSException
        {
                if (closed) throw new IllegalStateException("The connection is 
closed");
                if (distributedConnection==null) createReceiver();
                
                if (modeStop) return;
                modeStop=true;
                
                changeModeStop(modeStop);
        }
  
      public synchronized void close() throws JMSException
        {               
                if (closed) return;
                closed=true;    
  
                //Get an ID / ConnectionReciever
                if (distributedConnection==null) createReceiver();
                
                //notify his sessions
                synchronized (createdSessions) {
                        
                        Iterator i=createdSessions.iterator();
                        while (i.hasNext()) {
                                ((SpySession)i.next()).close();
                        }
                
                }
  
                //Notify the JMSServer that I am closing
                try {
                        provider.connectionClosing(distributedConnection);
                        ConnectionReceiverFactory.close(distributedConnection);
                } catch (Exception e) {
                        failureHandler(e,"Cannot close properly the connection");
                }               
                
        } 
  
        //called by a TemporaryDestination which is going to be deleted()
        public void deleteTemporaryDestination(SpyDestination dest) throws JMSException
        {
                if (closed) throw new IllegalStateException("The connection is 
closed");
                if (distributedConnection==null) createReceiver();
  
                Log.log("SpyConnection: deleteDestination(dest="+dest.toString()+")");
                
                try {
                        
                        //Remove it from the destinations list
                        synchronized (destinations) {
                                HashMap newMap=(HashMap)destinations.clone();   
                                newMap.remove(dest);
                                destinations=newMap;
                        }
                        
                        //Notify its sessions that this TemporaryDestination is going 
to be deleted()
                        //We could do that only on the Sessions "linked" to this 
Destination
                        synchronized (createdSessions) {
                                
                                Iterator i=createdSessions.iterator();
                                while (i.hasNext()) {
                                        
((SpySession)i.next()).deleteTemporaryDestination(dest);
                                }
  
                        }
                        
                        //Ask the broker to delete() this TemporaryDestination
                        provider.deleteTemporaryDestination(dest);
                        
                } catch (Exception e) {
                        failureHandler(e,"Cannot delete the TemporaryDestination");
                }
                
        }
  
        // Package protected ---------------------------------------------
            
        //Send a message to the provider
        //[We should try to locally dispatch the message...]
        void sendToServer(SpyMessage mes[]) throws JMSException
        {
                if (closed) throw new IllegalStateException("The connection is 
closed");                
                if (distributedConnection==null) createReceiver();
                
                try {
                                                                        
                        provider.newMessage(mes,clientID);
                        
                        /*if (mes.jmsDestination instanceof Topic) {
                                //If this message is sent to a topic, we can try to 
deliver it locally
                                
distributedConnection.cr.receive(mes.jmsDestination,mes);
                        }*/
                        
                } catch (Exception e) {
                        failureHandler(e,"Cannot send a message to the JMS provider");
                }
        }
        
        //A Session has created a new MessageConsumer for the Destination dest
        void addSession(Destination dest, SpySession who) throws JMSException
        {
                if (closed) throw new IllegalStateException("The connection is 
closed");                
                if (distributedConnection==null) createReceiver();
                                                                         
                Log.log("Connection: addSession(dest="+dest.toString()+")");
                                
                
                try {
  
                        synchronized (destinations) {
                                
                                ConnectionQueue 
connectionQueue=(ConnectionQueue)destinations.get(dest);
                                
                                if (connectionQueue==null) {                    
                                        connectionQueue=new ConnectionQueue(dest,this);
                                        connectionQueue.addSession(who);
                                        HashMap 
newDestinations=(HashMap)destinations.clone();
                                        newDestinations.put(dest,connectionQueue);
                                        destinations=newDestinations;
                                        
provider.subscribe(dest,distributedConnection);                 
                                } else {                        
                                        connectionQueue.addSession(who);               
         
                                }
                        }
  
                } catch (Exception e) {
                        failureHandler(e,"Cannot subscribe to this Destination");
                }       
                
        }               
        
        //The session does not need to recieve the messages to Destination dest
        void removeSession(Destination dest, SpySession who) throws JMSException
        {
                if (distributedConnection==null) createReceiver();
                
                Log.log("Connection: removeSession(dest="+dest.toString()+")");
                
                try {
                        
                        synchronized (destinations) {
                                
                                ConnectionQueue 
connectionQueue=(ConnectionQueue)destinations.get(dest);
                                
                                if (connectionQueue!=null) {
                                        boolean 
empty=connectionQueue.removeSession(who);
                                        if (empty) {
                                                HashMap 
newDestinations=(HashMap)destinations.clone();
                                                newDestinations.remove(dest);
                                                destinations=newDestinations;
                                                
provider.unsubscribe(dest,distributedConnection);
                                        } 
                                } else {
                                        //this should not happen
                                        HashMap 
newDestinations=(HashMap)destinations.clone();
                                        newDestinations.remove(dest);
                                        destinations=newDestinations;
                                        
provider.unsubscribe(dest,distributedConnection);
                                }
                                
                        }
                        
                } catch (Exception e) {
                        failureHandler(e,"Cannot unsubscribe to this destination");
                }
  
        }
        
        //Get a new messageID (creation of a new message)
        String getNewMessageID() throws JMSException
        {
                if (closed) throw new IllegalStateException("The connection is 
closed");                
                if (distributedConnection==null) createReceiver();
                                                                         
                return clientID+"-"+(lastMessageID++);
        }
        
        //notify his sessions that he has changed his stopped mode
        synchronized void changeModeStop(boolean newValue)
        {
                synchronized (createdSessions) {
                        
                        Iterator i=createdSessions.iterator();
                        while (i.hasNext()) {
                                ((SpySession)i.next()).notifyStopMode(newValue);
                        }
  
                }
                
        }
        
        //Called by a session when it is closing
        void sessionClosing(SpySession who)
        {
                synchronized (createdSessions) 
                {                       
                        createdSessions.remove(who);
                }
  
                //This session should not be in the "destinations" object anymore. 
                //We could check this, though
        }
        
        SpyMessage queueReceiveNoWait(Queue queue) throws JMSException 
        {
                try {
                        return provider.queueReceiveNoWait(queue);
                } catch (Exception e) {
                        failureHandler(e,"Cannot create a ConnectionReceiver");
                        return null;
                }
        }
        
        // Protected -------------------------------------------------------
  
        //create a new Distributed object which receives the messages for this 
connection
        protected void createReceiver() throws JMSException
        {
                try {
                        if (clientID==null) askForAnID();
                        ConnectionReceiver 
receiver=ConnectionReceiverFactory.createConnectionReceiver(this,crClassName);
                        distributedConnection=new 
SpyDistributedConnection(clientID,receiver);
                } catch (Exception e) {
                        failureHandler(e,"Cannot create a ConnectionReceiver");
                }
        }
  
        //ask the JMS server for a new ID
        protected void askForAnID() throws JMSException
        {
                try {
                        clientID=provider.getID();
                } catch (Exception e) {
                        failureHandler(e,"Cannot get an ID");
                }               
        }
  
        public void failureHandler(Exception e,String reason) throws JMSException
        {
                Log.error(e);
                
                JMSException excep=new JMSException(reason);
                excep.setLinkedException(e);
                
                if (exceptionListener!=null) {
                        synchronized (exceptionListener) {
                                exceptionListener.onException(excep);
                        }
                }
                
                throw excep;
        }
        
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyConnectionMetaData.java
  
  Index: SpyConnectionMetaData.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.ConnectionMetaData;
  import javax.jms.JMSException;
  import java.util.Enumeration;
  import java.util.Vector;
  
  /**
   *    This class implements javax.jms.ConnectionMetaData
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyConnectionMetaData 
        implements ConnectionMetaData
  {
  
        // Public --------------------------------------------------------
  
        public String getJMSVersion() throws JMSException
        {
                return "1.0";
        }
  
      public int getJMSMajorVersion() throws JMSException
        {
                return 1;
        }
   
      public int getJMSMinorVersion() throws JMSException
        {
                return 0;
        }
  
      public String getJMSProviderName() throws JMSException
        {
                return "JBoss";
        }
  
      public String getProviderVersion() throws JMSException
        {
                return "0.1";
        }
  
      public int getProviderMajorVersion() throws JMSException
        {
                return 0;
        }
   
        public int getProviderMinorVersion() throws JMSException
        {
                return 1;
        }
        
        public Enumeration getJMSXPropertyNames() throws JMSException
        {
                Vector vector=new Vector();
                vector.add("JMSXGroupID");
                vector.add("JMSXGroupSeq");
                return vector.elements();
        }
        
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyDestination.java
  
  Index: SpyDestination.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.Destination;
  import java.io.Serializable;
  import javax.naming.Referenceable;
  
  /**
   *    This class implements javax.jms.Destination
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyDestination 
        implements Destination, Serializable
  {
        // Attributes ----------------------------------------------------
  
        protected String name;
                        
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyDistributedConnection.java
  
  Index: SpyDistributedConnection.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import org.spydermq.distributed.interfaces.ConnectionReceiver;
  import java.io.Serializable;
  
  /**
   *    This class is the broker point of view on a SpyConnection (it contains a 
ConnectionReceiver)
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyDistributedConnection 
        implements Serializable
  {
        private String clientID;
        public ConnectionReceiver cr;
        public transient int listeners;
        
        SpyDistributedConnection(String id,ConnectionReceiver cr_)
        {
                clientID=id;
                cr=cr_;
        }
        
        String getClientID()
        {
                return clientID;
        }
        
        public boolean equals(Object obj)
        {
                if (obj instanceof ConnectionReceiver) return 
cr.equals((ConnectionReceiver)obj);
                if (obj instanceof SpyDistributedConnection) return 
clientID.equals(((SpyDistributedConnection)obj).clientID);
                return false;
        }
  
        public int hashCode()
        {
                if (clientID==null) return 0;
                return clientID.hashCode();
        }       
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyEncapsulatedMessage.java
  
  Index: SpyEncapsulatedMessage.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.Message;
  
  /**
   *    This Message class is used to send a non 'provider-optimized Message' over the 
network [4.4.5]
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyEncapsulatedMessage
        extends SpyMessage
  {
        
        private Message mes;
        
        SpyEncapsulatedMessage(Message m)
        {
                //mes=m.clone();                
        }
        
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyMapMessage.java
  
  Index: SpyMapMessage.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.MapMessage;
  import javax.jms.JMSException;
  import javax.jms.MessageFormatException;
  import javax.jms.MessageNotWriteableException;
  import java.util.Enumeration;
  import java.util.Hashtable;
   
  /**
   *    This class implements javax.jms.MapMessage
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyMapMessage 
        extends SpyMessage 
        implements MapMessage, Cloneable 
  { 
        // Attributes ----------------------------------------------------
  
        private Hashtable content;
  
        // Constructor ---------------------------------------------------
           
        SpyMapMessage()
        {
                content=new Hashtable();
        }
  
        // Public --------------------------------------------------------
  
        public boolean getBoolean(String name) throws JMSException
        {
                Object value=content.get(name);
                if (value==null) return Boolean.getBoolean(null);
                
                if (value instanceof Boolean) return ((Boolean)value).booleanValue();
                else if (value instanceof String) return 
Boolean.getBoolean((String)value);
                else throw new MessageFormatException("Invalid conversion");
        }
  
      public byte getByte(String name) throws JMSException
        {
                Object value=content.get(name);
                if (value==null) return Byte.parseByte(null);
                
                if (value instanceof Byte) return ((Byte)value).byteValue();
                else if (value instanceof String) return Byte.parseByte((String)value);
                else throw new MessageFormatException("Invalid conversion");
        }
  
      public short getShort(String name) throws JMSException
        {
                Object value=content.get(name);
                if (value==null) return Short.parseShort(null);
                                
                if (value instanceof Byte) return ((Byte)value).shortValue();
                else if (value instanceof Short) return ((Short)value).shortValue();
                else if (value instanceof String) return 
Short.parseShort((String)value);
                else throw new MessageFormatException("Invalid conversion");
        }
        
      public char getChar(String name) throws JMSException
        {
                Object value=content.get(name);
                if (value==null) throw new NullPointerException("Invalid conversion");
                                
                if (value instanceof Character) return ((Character)value).charValue();
                else throw new MessageFormatException("Invalid conversion");
        }
        
      public int getInt(String name) throws JMSException
        {
                Object value=content.get(name);
                if (value==null) return Integer.parseInt(null);
                                
                if (value instanceof Byte) return ((Byte)value).intValue();
                else if (value instanceof Short) return ((Short)value).intValue();
                else if (value instanceof Integer) return ((Integer)value).intValue();
                else if (value instanceof String) return 
Integer.parseInt((String)value);
                else throw new MessageFormatException("Invalid conversion");
        }
        
      public long getLong(String name) throws JMSException
        {
                Object value=content.get(name);
                if (value==null) return Long.parseLong(null);
                                
                if (value instanceof Byte) return ((Byte)value).longValue();
                else if (value instanceof Short) return ((Short)value).longValue();
                else if (value instanceof Integer) return ((Integer)value).longValue();
                else if (value instanceof Long) return ((Long)value).longValue();      
         
                else if (value instanceof String) return Long.parseLong((String)value);
                else throw new MessageFormatException("Invalid conversion");
        }
        
      public float getFloat(String name) throws JMSException
        {
                Object value=content.get(name);
                if (value==null) return Float.parseFloat(null);
                                
                if (value instanceof Float) return ((Float)value).floatValue();
                else if (value instanceof String) return 
Float.parseFloat((String)value);
                else throw new MessageFormatException("Invalid conversion");
        }
  
      public double getDouble(String name) throws JMSException
        {
                Object value=content.get(name);
                if (value==null) return Double.parseDouble(null);
                                
                if (value instanceof Float) return ((Float)value).doubleValue();
                else if (value instanceof Double) return ((Double)value).doubleValue();
                else if (value instanceof String) return 
Double.parseDouble((String)value);
                else throw new MessageFormatException("Invalid conversion");
        }
        
      public String getString(String name) throws JMSException
        {
                Object value=content.get(name);
                if (value==null) return null;
                                
                if (value instanceof Boolean) return ((Boolean)value).toString();
                else if (value instanceof Byte) return ((Byte)value).toString();
                else if (value instanceof Short) return ((Short)value).toString();
                else if (value instanceof Character) return 
((Character)value).toString();
                else if (value instanceof Integer) return ((Integer)value).toString();
                else if (value instanceof Long) return ((Long)value).toString();
                else if (value instanceof Float) return ((Float)value).toString();
                else if (value instanceof Double) return ((Double)value).toString();
                else if (value instanceof String) return (String)value;
                else throw new MessageFormatException("Invalid conversion");
        }
        
      public byte[] getBytes(String name) throws JMSException
        {
                Object value=content.get(name);
                if (value==null) return null;
                if (value instanceof byte[]) return (byte[])value;
                else throw new MessageFormatException("Invalid conversion");           
 
        }
        
      public Object getObject(String name) throws JMSException
        {
                return content.get(name);
        }
      
        public Enumeration getMapNames() throws JMSException
        {               
                return content.keys();
        }
      
        public void setBoolean(String name, boolean value) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("Message is 
ReadOnly !");
                content.put(name,new Boolean(value));
        }
      
        public void setByte(String name, byte value) throws JMSException
        {               
                if (msgReadOnly) throw new MessageNotWriteableException("Message is 
ReadOnly !");
                content.put(name,new Byte(value));
        }
  
        public void setShort(String name, short value) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("Message is 
ReadOnly !");
                content.put(name,new Short(value));
        }
      
        public void setChar(String name, char value) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("Message is 
ReadOnly !");
                content.put(name,new Character(value));
        }
      
        public void setInt(String name, int value) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("Message is 
ReadOnly !");
                content.put(name,new Integer(value));
        }
  
        public void setLong(String name, long value) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("Message is 
ReadOnly !");
                content.put(name,new Long(value));
        }
  
        public void setFloat(String name, float value) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("Message is 
ReadOnly !");
                content.put(name,new Float(value));     
        }
        
      public void setDouble(String name, double value) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("Message is 
ReadOnly !");
                content.put(name,new Double(value));    
        }
        
      public void setString(String name, String value) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("Message is 
ReadOnly !");
                content.put(name,value);
        }
                                                                                       
                  
      public void setBytes(String name, byte[] value) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("Message is 
ReadOnly !");
                content.put(name,value.clone());
        }
   
      public void setBytes(String name, byte[] value, int offset, int length) throws 
JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("Message is 
ReadOnly !");
                
                if (offset+length>value.length) throw new JMSException("Array is too 
small");           
                byte[] temp = new byte[length];
                for(int i=0;i<length;i++) 
                        temp[i]=value[i+offset];
                
                content.put(name,temp);
        }
                                                                                       
                                                                 
      public void setObject(String name, Object value) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("Message is 
ReadOnly !");
                
                if (value instanceof Boolean) content.put(name,value);
                else if (value instanceof Byte) content.put(name,value);
                else if (value instanceof Short) content.put(name,value);
                else if (value instanceof Character) content.put(name,value);
                else if (value instanceof Integer) content.put(name,value);
                else if (value instanceof Long) content.put(name,value);
                else if (value instanceof Float) content.put(name,value);
                else if (value instanceof Double) content.put(name,value);
                else if (value instanceof String) content.put(name,value);
                else if (value instanceof byte[]) 
content.put(name,((byte[])value).clone());
                else throw new MessageFormatException("Invalid object type");          
 
        }
                                                                                       
                  
      public boolean itemExists(String name) throws JMSException
        {               
                return content.containsKey(name);
        }
                                                                                
      public void clearBody() throws JMSException
        {
                content=new Hashtable();
                super.clearBody();                      
        }
  
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyMessage.java
  
  Index: SpyMessage.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  
  import javax.jms.Message;
  import javax.jms.JMSException;
  import javax.jms.MessageFormatException;
  import javax.jms.MessageNotWriteableException;
  import javax.jms.Destination;
  import java.util.Enumeration;
  import java.util.Hashtable;
  import java.util.Date;
  import java.io.Serializable;
  
  /**
   *    This class implements javax.jms.Message
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyMessage 
        implements Serializable, Cloneable, Message
  {
  
        // Constants -----------------------------------------------------
            
        static final int DEFAULT_DELIVERY_MODE = -1;
      static final int DEFAULT_PRIORITY = -1;
      static final int DEFAULT_TIME_TO_LIVE = -1;
  
        // Attributes ----------------------------------------------------
  
        //Those attributes are not transient ---------------
        
        //Header fields 
        //Set by send() method
        Destination jmsDestination=null;
        private int jmsDeliveryMode=-1;
        private long jmsExpiration=0;
        private int jmsPriority=-1;
        private String jmsMessageID=null;
        private long jmsTimeStamp=0;
        //Set by the client
        private boolean jmsCorrelationID=true;
        private String jmsCorrelationIDString=null; 
        private byte[] jmsCorrelationIDbyte=null;
        private Destination jmsReplyTo=null;
        private String jmsType=null;
        //Set by the provider
        private boolean jmsRedelivered=false;
        
        //Properties
        private Hashtable prop;
        private boolean propReadWrite;
        
        //Message body
        protected boolean msgReadOnly=false;
        
        //Those attributes are transient ---------------
        
        //For acknowledgment
        private transient SessionQueue mySessionQueue;
        //For the storage in the JMSServerQueue object
        public transient SpyDistributedConnection originalDistributedConnection;
  
        // Constructor ---------------------------------------------------
           
        SpyMessage()
        {
                prop=new Hashtable();
                propReadWrite=true;
                mySessionQueue=null;
        }       
  
        // Public --------------------------------------------------------
  
        public String getJMSMessageID() throws JMSException
        {
                return jmsMessageID;
        }
        
      public void setJMSMessageID(String id) throws JMSException
        {
                jmsMessageID=id;
        }
        
      public long getJMSTimestamp() throws JMSException
        {
                return jmsTimeStamp;
        }
        
      public void setJMSTimestamp(long timestamp) throws JMSException
        {
                jmsTimeStamp=timestamp;
        }
        
      public byte [] getJMSCorrelationIDAsBytes() throws JMSException
        {       
                if (jmsCorrelationID) throw new JMSException("JMSCorrelationID is a 
string");
                return jmsCorrelationIDbyte;
        }
        
      public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException
        {
                jmsCorrelationID=false;                         
                jmsCorrelationIDbyte=(byte[])correlationID.clone();
                jmsCorrelationIDString=null;
        }
        
      public void setJMSCorrelationID(String correlationID) throws JMSException
        {
                jmsCorrelationID=true;
                jmsCorrelationIDString=correlationID;
                jmsCorrelationIDbyte=null;
        }
        
      public String getJMSCorrelationID() throws JMSException
        {
                if (!jmsCorrelationID) throw new JMSException("JMSCorrelationID is an 
array");
                return jmsCorrelationIDString;
        }
        
      public Destination getJMSReplyTo() throws JMSException
        {
                return jmsReplyTo;
        }
        
      public void setJMSReplyTo(Destination replyTo) throws JMSException
        {
                jmsReplyTo=replyTo;
        }
        
      public Destination getJMSDestination() throws JMSException
        {
                return jmsDestination;
        }
        
      public void setJMSDestination(Destination destination) throws JMSException
        {
                jmsDestination=destination;
        }
        
      public int getJMSDeliveryMode() throws JMSException
        {
                return jmsDeliveryMode;
        }
        
      public void setJMSDeliveryMode(int deliveryMode) throws JMSException
        {
                jmsDeliveryMode=deliveryMode;
        }
        
      public boolean getJMSRedelivered() throws JMSException
        {               
                return jmsRedelivered;
        }
        
      public void setJMSRedelivered(boolean redelivered) throws JMSException
        {
                jmsRedelivered=redelivered;
        }
        
      public String getJMSType() throws JMSException
        {               
                return jmsType;
        }
        
      public void setJMSType(String type) throws JMSException
        {
                jmsType=type;
        }
        
      public long getJMSExpiration() throws JMSException
        {               
                return jmsExpiration;
        }
        
      public void setJMSExpiration(long expiration) throws JMSException
        {
                jmsExpiration=expiration;
        }
        
      public int getJMSPriority() throws JMSException
        {
                return jmsPriority;
        }
        
      public void setJMSPriority(int priority) throws JMSException
        {
                jmsPriority=priority;
        }
        
      public void clearProperties() throws JMSException
        {
                prop=new Hashtable();
                propReadWrite=true;
        }
        
      public boolean propertyExists(String name) throws JMSException
        {
                return prop.containsKey(name);
        }
        
      public boolean getBooleanProperty(String name) throws JMSException
        {
                Object value=prop.get(name);
                if (value==null) throw new NullPointerException();
                
                if (value instanceof Boolean) return ((Boolean)value).booleanValue();
                else if (value instanceof String) return 
Boolean.getBoolean((String)value);
                else throw new MessageFormatException("Invalid conversion");           
 
        }
        
      public byte getByteProperty(String name) throws JMSException
        {
                Object value=prop.get(name);
                if (value==null) throw new NullPointerException();
                
                if (value instanceof Byte) return ((Byte)value).byteValue();
                else if (value instanceof String) return Byte.parseByte((String)value);
                else throw new MessageFormatException("Invalid conversion");    
        }
        
      public short getShortProperty(String name) throws JMSException
        {
                Object value=prop.get(name);
                if (value==null) throw new NullPointerException();
                
                if (value instanceof Byte) return ((Byte)value).shortValue();
                else if (value instanceof Short) return ((Short)value).shortValue();
                else if (value instanceof String) return 
Short.parseShort((String)value);
                else throw new MessageFormatException("Invalid conversion");           
 
        }
        
      public int getIntProperty(String name) throws JMSException
        {
                Object value=prop.get(name);
                if (value==null) throw new NullPointerException();
                
                if (value instanceof Byte) return ((Byte)value).intValue();
                else if (value instanceof Short) return ((Short)value).intValue();
                else if (value instanceof Integer) return ((Integer)value).intValue();
                else if (value instanceof String) return 
Integer.parseInt((String)value);
                else throw new MessageFormatException("Invalid conversion");
        }
        
      public long getLongProperty(String name) throws JMSException
        {
                Object value=prop.get(name);
                if (value==null) throw new NullPointerException();
  
                if (value instanceof Byte) return ((Byte)value).longValue();
                else if (value instanceof Short) return ((Short)value).longValue();
                else if (value instanceof Integer) return ((Integer)value).longValue();
                else if (value instanceof Long) return ((Long)value).longValue();      
         
                else if (value instanceof String) return Long.parseLong((String)value);
                else throw new MessageFormatException("Invalid conversion");
        }
        
      public float getFloatProperty(String name) throws JMSException
        {
                Object value=prop.get(name);
                if (value==null) throw new NullPointerException();
  
                if (value instanceof Float) return ((Float)value).floatValue();
                else if (value instanceof String) return 
Float.parseFloat((String)value);
                else throw new MessageFormatException("Invalid conversion");
        }
        
      public double getDoubleProperty(String name) throws JMSException
        {
                Object value=prop.get(name);
                if (value==null) throw new NullPointerException();
  
                if (value instanceof Float) return ((Float)value).doubleValue();
                else if (value instanceof Double) return ((Double)value).doubleValue();
                else if (value instanceof String) return 
Double.parseDouble((String)value);
                else throw new MessageFormatException("Invalid conversion");
        }
        
      public String getStringProperty(String name) throws JMSException
        {
                Object value=prop.get(name);
                if (value==null) return null;
  
                if (value instanceof Boolean) return ((Boolean)value).toString();
                else if (value instanceof Byte) return ((Byte)value).toString();
                else if (value instanceof Short) return ((Short)value).toString();
                else if (value instanceof Integer) return ((Integer)value).toString();
                else if (value instanceof Long) return ((Long)value).toString();
                else if (value instanceof Float) return ((Float)value).toString();
                else if (value instanceof Double) return ((Double)value).toString();
                else if (value instanceof String) return (String)value;
                else throw new MessageFormatException("Invalid conversion");
        }
        
      public Object getObjectProperty(String name) throws JMSException
        {
                Object value=prop.get(name);
                return value;
        }
        
      public Enumeration getPropertyNames() throws JMSException
        {
                return prop.keys();
        }
        
        void CheckPropertyName(String name) throws JMSException
        {
                if (name.regionMatches(false,0,"JMS_",0,4)) {
                        throw new JMSException("Bad property name");
                }
                
                if (name.regionMatches(false,0,"JMSX",0,4)) {
                        if (name.equals("JMSXGroupId")) return;
                        if (name.equals("JMSXGroupSeq")) return;
                        throw new JMSException("Bad property name");
                }
                
        }
        
      public void setBooleanProperty(String name, boolean value) throws JMSException
        {
                CheckPropertyName(name);
                if (!propReadWrite) throw new MessageNotWriteableException("Properties 
are read-only");
                prop.put(name,new Boolean(value));
        }
        
      public void setByteProperty(String name, byte value) throws JMSException
        {
                CheckPropertyName(name);
                if (!propReadWrite) throw new MessageNotWriteableException("Properties 
are read-only");
                prop.put(name,new Byte(value));
        }
        
      public void setShortProperty(String name, short value) throws JMSException
        {
                CheckPropertyName(name);
                if (!propReadWrite) throw new MessageNotWriteableException("Properties 
are read-only");
                prop.put(name,new Short(value));
        }
        
      public void setIntProperty(String name, int value) throws JMSException
        {
                CheckPropertyName(name);
                if (!propReadWrite) throw new MessageNotWriteableException("Properties 
are read-only");
                prop.put(name,new Integer(value));
        }
        
      public void setLongProperty(String name, long value) throws JMSException
        {
                CheckPropertyName(name);
                if (!propReadWrite) throw new MessageNotWriteableException("Properties 
are read-only");
                prop.put(name,new Long(value));
        }
        
      public void setFloatProperty(String name, float value) throws JMSException
        {
                CheckPropertyName(name);
                if (!propReadWrite) throw new MessageNotWriteableException("Properties 
are read-only");
                prop.put(name,new Float(value));
        }
        
      public void setDoubleProperty(String name, double value) throws JMSException
        {
                CheckPropertyName(name);
                if (!propReadWrite) throw new MessageNotWriteableException("Properties 
are read-only");
                prop.put(name,new Double(value));
        }
        
      public void setStringProperty(String name, String value) throws JMSException
        {
                CheckPropertyName(name);
                if (!propReadWrite) throw new MessageNotWriteableException("Properties 
are read-only");
                prop.put(name,new String(value));
        }
        
      public void setObjectProperty(String name, Object value) throws JMSException
        {
                CheckPropertyName(name);
                if (!propReadWrite) throw new MessageNotWriteableException("Properties 
are read-only");
                
                if (value instanceof Boolean) prop.put(name,value);
                else if (value instanceof Byte) prop.put(name,value);
                else if (value instanceof Short) prop.put(name,value);
                else if (value instanceof Integer) prop.put(name,value);
                else if (value instanceof Long) prop.put(name,value);
                else if (value instanceof Float) prop.put(name,value);
                else if (value instanceof Double) prop.put(name,value);
                else if (value instanceof String) prop.put(name,value);
                else throw new MessageFormatException("Invalid object type");          
                 
        }
        
      public void clearBody() throws JMSException
        {
                //Inherited classes clear their content here
                msgReadOnly=false;
        }
        
        public void acknowledge() throws JMSException
        {
                //There is no need to acknowledge() this message
                if (mySessionQueue==null) return;
                
                mySessionQueue.acknowledge(this);
        }
  
        // Package protected ---------------------------------------------
            
        void setSessionQueue(SessionQueue sessionQueue)
        {
                mySessionQueue=sessionQueue;
        }
  
        void setReadOnlyMode()
        {
                propReadWrite=false;
                msgReadOnly=true;
        }
        
        SpyMessage myClone() 
        {
                try { 
                        return (SpyMessage)clone();
                } catch (CloneNotSupportedException e) {
                        throw new RuntimeException("myClone failed !");
                }
        }
        
        boolean isOutdated()
        {
                if (jmsExpiration==0) return false;
                long ts=(new Date()).getTime();
                return jmsExpiration<ts;
        }
                        
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java
  
  Index: SpyMessageConsumer.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.MessageConsumer;
  import javax.jms.JMSException;
  import javax.jms.MessageListener;
  import javax.jms.Message;
  import javax.jms.Session;
  import java.util.LinkedList;
  import java.util.Date;
  import org.spydermq.selectors.Selector;
  
  /**
   *    This class implements javax.jms.MessageConsumer
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyMessageConsumer 
        implements MessageConsumer
  {
        // Attributes ----------------------------------------------------
  
        //Link to my session
        protected SpySession session;
        //My message listener (null if none)
        MessageListener messageListener;
        //Am I closed ?
        protected boolean closed;
        //Do I have a selector
        public Selector selector;
        //The message selector
        public String messageSelector;
        //A link to my session queue (in my session)
        protected SessionQueue sessionQueue;
        //List of Pending messages (not yet delivered)
        LinkedList messages;
        //Is the consumer sleeping in a receive() ?
        boolean waitInReceive;
        
        // Constructor ---------------------------------------------------
           
        SpyMessageConsumer(SpySession s)
        {
                session=s;
                messageListener=null;
                closed=false;
                selector=null;
                messageSelector=null;
                messages=new LinkedList();
                waitInReceive=false;
        }
        
        void setSessionQueue(SessionQueue sessionQueue)
        {
                this.sessionQueue=sessionQueue;
        }               
        
        // Public --------------------------------------------------------
  
      public String getMessageSelector() throws JMSException
        {
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
                                
                return messageSelector;
        }
  
      public MessageListener getMessageListener() throws JMSException
        {
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
                
                return messageListener;
        }
  
      public void setMessageListener(MessageListener listener) throws JMSException
        {       
                //Job is done in the inherited classes
                //The QueueReceiver object need to notify their session / connection / 
the broker
                throw new RuntimeException("pure virtual call");
        }
  
      public Message receive() throws JMSException
        {
                //Job is done in the inherited classes
                //The QueueReceiver object need to notify their session / connection / 
the broker
                throw new RuntimeException("pure virtual call");
        }
  
      public Message receive(long timeOut) throws JMSException
        {
                //Job is done in the inherited classes
                //The QueueReceiver object need to notify their session / connection / 
the broker
                throw new RuntimeException("pure virtual call");
        }
  
      public Message receiveNoWait() throws JMSException
        {
                //Job is done in the inherited classes
                //The QueueReceiver object need to notify their session / connection / 
the broker
                throw new RuntimeException("pure virtual call");
        }
  
      public synchronized void close() throws JMSException
        {
                //Job is done in the inherited classes
                //The QueueReceiver object need to notify their session / connection / 
the broker
                throw new RuntimeException("pure virtual call");
        }
        
        //Package protected - Not part of the spec
        
        void setSelector(Selector selector,String messageSelector)
        {
                this.selector=selector;
                this.messageSelector=messageSelector;
        }
        
        SpyMessage getMessage()
        {
                synchronized (messages) {
                                
                        while (true) {
  
                                try {
                                        if (messages.size()==0) return null;
                                
                                        SpyMessage 
mes=(SpyMessage)messages.removeFirst();
                                
                                        if (mes.isOutdated()) {
                                                Log.notice("SessionQueue: I dropped a 
message (timeout)");
                                                continue;
                                        }
                                                
                                        if (selector!=null) {
                                                if (!selector.test(mes)) {
                                                        Log.log("SessionQueue: I 
dropped a message (selector)");
                                                        continue;
                                                } else {
                                                        Log.log("SessionQueue: 
selector evaluates TRUE");
                                                }
                                        }
                                                
                                        //the SAME Message object is put in different 
SessionQueues
                                        //when we deliver it, we have to clone() it to 
insure independance
                                        SpyMessage message=mes.myClone();
                                                                                       
                                                         
                                        if (!session.transacted) {
                                                if 
(session.acknowledgeMode==Session.CLIENT_ACKNOWLEDGE) {
                                                                
                                                        synchronized 
(sessionQueue.messagesWaitingForAck) {
                                                                //Put the message in 
the messagesWaitForAck queue
                                                                
sessionQueue.messagesWaitingForAck.addLast(message);
                                                        }
                                                                
                                                        
message.setSessionQueue(sessionQueue);
                                                                
                                                } else if 
(session.acknowledgeMode==Session.DUPS_OK_ACKNOWLEDGE) {
                                                        //DUPS_OK_ACKNOWLEDGE
                                                } else {
                                                        //AUTO_ACKNOWLEDGE 
                                                        //we don't need to keep this 
message in a queue
                                                }
                                        } else {
                                                        
                                                //We are linked to a transacted 
session                                                                                
 
                                                        
                                                synchronized 
(sessionQueue.messagesWaitingForAck) {
                                                        //Put the message in the 
messagesWaitForAck queue
                                                        
sessionQueue.messagesWaitingForAck.addLast(message);
                                                }
                                                        
                                        }
                                                
                                        return message;
                                                                                
                                } catch (Exception e) {
                                        Log.error(e);
                                }
  
                        }
                        
                }
  
        }
        
        void addMessage(SpyMessage mes) throws JMSException
        {
                synchronized (messages) {
                        //Add a message to the queue
                        
                        //Test the priority
                        int pri=mes.getJMSPriority();
                        
                        if (pri<=4) {                           
                                //normal priority message
                                messages.addLast(mes);
                        } else {                                
                                //expedited priority message
                                int size=messages.size();
                                int i=0;                                
                                for(;i<size;i++) {
                                        if 
(((SpyMessage)messages.get(i)).getJMSPriority()<pri) break;
                                }                               
                                messages.add(i,mes);                            
                        }
                        
                }
        }
        
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyMessageProducer.java
  
  Index: SpyMessageProducer.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.MessageProducer;
  import javax.jms.JMSException;
  import javax.jms.DeliveryMode;
  import javax.jms.Message;
  
  /**
   *    This class implements javax.jms.MessageProducer
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyMessageProducer 
        implements MessageProducer
  {   
        // Attributes ----------------------------------------------------
  
        private boolean disableMessageID = false;
        private boolean disableTS = false;
        protected int defaultDeliveryMode = DeliveryMode.NON_PERSISTENT;
        protected int defaultPriority=4;
        protected int defaultTTL=0;
  
        // Public --------------------------------------------------------
        
      public void setDisableMessageID(boolean value) throws JMSException
        {
                disableMessageID=value;
        }
  
      public boolean getDisableMessageID() throws JMSException
        {
                return disableMessageID;
        }
        
      public void setDisableMessageTimestamp(boolean value) throws JMSException
        {
                disableTS=value;
        }
  
      public boolean getDisableMessageTimestamp() throws JMSException
        {
                return disableTS;
        }
  
      public void setDeliveryMode(int deli) throws JMSException
        {
                if (deli==Message.DEFAULT_DELIVERY_MODE) 
defaultDeliveryMode=DeliveryMode.NON_PERSISTENT;
                else if 
(deli!=DeliveryMode.NON_PERSISTENT&&deli!=DeliveryMode.PERSISTENT) throw new 
JMSException("Bad DeliveryMode value");
                else defaultDeliveryMode=deli;
        }
        
      public int getDeliveryMode() throws JMSException
        {
                return defaultDeliveryMode;
        }
  
      public void setPriority(int pri) throws JMSException
        {
                if (pri==Message.DEFAULT_PRIORITY) defaultPriority=4;
                else if (pri<0||pri>9) throw new JMSException("Bad priority value");
                else defaultPriority=pri;
        }
  
      public int getPriority() throws JMSException
        {
                return defaultPriority;
        }
  
      public void setTimeToLive(int timeToLive) throws JMSException
        {
                if (timeToLive==Message.DEFAULT_TIME_TO_LIVE) timeToLive=0;
                else if (timeToLive<0) throw new JMSException("Bad TimeToLive value");
                else defaultTTL=timeToLive;
        }
     
      public int getTimeToLive() throws JMSException
        {
                return defaultTTL;
        }
        
      public void close() throws JMSException
        {
                //Is there anything useful to do ?
                //Let the GC do its work !
        }
        
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyObjectMessage.java
  
  Index: SpyObjectMessage.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.ObjectMessage;
  import javax.jms.JMSException;
  import javax.jms.MessageNotWriteableException;
  import javax.jms.MessageFormatException;
  import java.io.Serializable;
  import java.io.IOException;
  import java.io.OptionalDataException;
  import java.io.ByteArrayOutputStream;
  import java.io.ObjectOutputStream;
  import java.io.ByteArrayInputStream;
  import java.io.ObjectInputStream;
  
  /**
   *    This class implements javax.jms.ObjectMessage
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyObjectMessage 
        extends SpyMessage 
        implements Cloneable, ObjectMessage
  {
        
        // Attributes ----------------------------------------------------
  
        private byte[] content=null;
        
        // Public --------------------------------------------------------
  
      public void setObject(Serializable object) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("setObject");
                try {           
                        ByteArrayOutputStream ostream = new ByteArrayOutputStream();
                        ObjectOutputStream p = new ObjectOutputStream(ostream);
  
                        p.writeObject(object);
  
                        p.flush();
                        content=ostream.toByteArray();
                        ostream.close();
                } catch (IOException e) {
                        throw new MessageFormatException("Object cannot be 
serialized");
                }                               
        }
  
      public Serializable getObject() throws JMSException
        {
                try {           
                        ByteArrayInputStream istream = new 
ByteArrayInputStream(content);
                        ObjectInputStream p = new ObjectInputStream(istream);
  
                        Serializable object=(Serializable)p.readObject();
  
                        istream.close();
  
                        return object;
                } catch (OptionalDataException e) {
                        throw new MessageFormatException("OptionalDataException");
                } catch (ClassNotFoundException e) {
                        throw new MessageFormatException("ClassNotFoundException");
                } catch (IOException e) {
                        throw new MessageFormatException("IOException");
                }                    
        }
        
      public void clearBody() throws JMSException
        {
                content=null;
                super.clearBody();                      
        }
        
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyQueue.java
  
  Index: SpyQueue.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.Queue;
  import javax.jms.JMSException;
  import java.io.Serializable;
  
  /**
   *    This class implements javax.jms.Queue
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyQueue
        extends SpyDestination 
        implements Queue, Serializable
  {
        
        // Constructor ---------------------------------------------------
           
        SpyQueue(String queueName)
        {
                super();
                name=queueName;
        }
  
        // Public --------------------------------------------------------
  
        public String getQueueName() throws JMSException
        {
                return name;
        }
        
        public String toString()
        {
                return "Queue@"+name;
        }
        
        // Object override -----------------------------------------------
  
        //A topic is identified by its name
        public boolean equals(Object obj)
        {
                if (obj instanceof SpyQueue)
                        return ((SpyDestination)obj).name.equals(name);
                return false;
        }
  
        public int hashCode()
        {
                return name.hashCode()+1;
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyQueueBrowser.java
  
  Index: SpyQueueBrowser.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.QueueBrowser;
  import javax.jms.Queue;
  import javax.jms.JMSException;
  import java.util.Enumeration;
  
  /**
   *    This class implements javax.jms.QueueBrowser
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyQueueBrowser 
        implements QueueBrowser 
  {
  
        //Public
  
      public Queue getQueue() throws JMSException
        {
                //Nor implemented yet
                return null;
        }
        
      public String getMessageSelector() throws JMSException
        {
                //Nor implemented yet
                return null;
        }
        
      public Enumeration getEnumeration() throws JMSException
        {
                //Nor implemented yet
                return null;
        }
        
      public void close() throws JMSException
        {
                //Nor implemented yet
                return;
        }       
  }
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyQueueConnection.java
  
  Index: SpyQueueConnection.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.QueueConnection;
  import javax.jms.JMSException;
  import javax.jms.QueueSession;
  import javax.jms.ConnectionConsumer;
  import javax.jms.ServerSessionPool;
  import javax.jms.TemporaryQueue;
  import javax.jms.Queue;
  import java.io.Serializable;
  import org.spydermq.distributed.interfaces.DistributedJMSServer;
  
  /**
   *    This class implements javax.jms.QueueConnection
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyQueueConnection 
        extends SpyConnection 
        implements Serializable, QueueConnection 
  {
  
        // Constants -----------------------------------------------------
            
        // Attributes ----------------------------------------------------
  
        // Constructor ---------------------------------------------------
           
        public SpyQueueConnection(DistributedJMSServer theServer,String cID,String 
crCN) throws JMSException
        { 
                super(theServer,cID,crCN);
        } 
  
        // Public --------------------------------------------------------
        
      public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) 
throws JMSException
        {               
                if (closed) throw new IllegalStateException("The connection is 
closed");                
                if (distributedConnection==null) createReceiver();
                                
                QueueSession session=new 
SpyQueueSession(this,transacted,acknowledgeMode,modeStop);
                
                //add the new session to the createdSessions list 
                synchronized (createdSessions) {
                        createdSessions.add(session);
                }
                
                return session;
        }
  
        public ConnectionConsumer createConnectionConsumer(Queue queue,
                                                        String messageSelector,
                                                        ServerSessionPool sessionPool,
                                                        int maxMessages) throws 
JMSException
        {
                if (closed) throw new IllegalStateException("The connection is 
closed");                
                if (distributedConnection==null) createReceiver();
                                                                                
                //Not impelemted yet
                return null;
        }
  
        // Package protected ---------------------------------------------
            
        void sendToServer(SpyMessage[] c) throws JMSException
        {
                Log.log("Connection: sendToServer("+c.length+" msgs)");
                super.sendToServer(c);
        }
  
        TemporaryQueue getTemporaryQueue() throws JMSException
        {
                if (closed) throw new IllegalStateException("The connection is 
closed");
                if (distributedConnection==null) createReceiver();
  
                try {
                        return provider.getTemporaryQueue(distributedConnection);
                } catch (Exception e) {
                        failureHandler(e,"Cannot create a temporary queue !");
                        return null;
                }
        }       
  
        //Get a queue
        Queue createQueue(String name) throws JMSException
        {               
                try {
                        if (closed) throw new IllegalStateException("The connection is 
closed");        
                        if (distributedConnection==null) createReceiver();             
         
                        return provider.createQueue(name);
                } catch (Exception e) {
                        failureHandler(e,"Cannot get the Queue from the provider");
                        return null;
                }
        }
        
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java
  
  Index: SpyQueueReceiver.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.QueueReceiver;
  import javax.jms.JMSException;
  import javax.jms.Queue;
  import javax.jms.Message;
  import javax.jms.MessageListener;
  import java.util.Date;
  
  /**
   *    This class implements javax.jms.QueueReceiver
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyQueueReceiver 
        extends SpyMessageConsumer 
        implements QueueReceiver
  {
        // Attributes ----------------------------------------------------
  
        //The queue I registered
        private Queue queue;
        //Mode of this QueueReceiver
        boolean listening;
        
        // Constructor ---------------------------------------------------
           
      SpyQueueReceiver(SpyQueueSession session,Queue queue) 
        {
                super(session);
                this.queue=queue;
                listening=false;
        }
  
        // Public --------------------------------------------------------
  
        public Queue getQueue() throws JMSException
        {
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
                                
                return queue;
        }
  
        public void close() throws JMSException
        {       
                if (closed) return;
                closed=true;
  
                setListening(false);
        }
  
        //Overrides MessageConsumer
        
      public Message receive() throws JMSException
        {
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");
                        
                setListening(true);
                
                synchronized (messages) {
                        
                        //if the client follows the specification [4.4.6], he cannot 
use this session
                        //to asynchronously receive a message or receive() in another 
thread.
                        //If a message is already pending for this session, we can 
immediatly deliver it 
                        
                        while (true) {
                                
                                if (closed) {
                                        setListening(false);
                                        return null;
                                }
                                
                                if (!session.modeStop) {
                                        Message mes=getMessage();
                                        if (mes!=null) {
                                                setListening(false);
                                                return mes;
                                        }
                                } else Log.log("the connection is stopped !");
                                
                                try {
                                        waitInReceive=true;
                                        messages.wait();
                                } catch (InterruptedException e) {
                                } finally {
                                        waitInReceive=false;
                                }
                                
                        }
                        
                }
                                
        }
  
      public Message receive(long timeOut) throws JMSException
        {
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");
                
                if (timeOut==0) return receive();               
                long endTime=(new Date()).getTime()+timeOut;
                
                setListening(true);
  
                synchronized (messages) {
                        
                        //if the client respects the specification [4.4.6], he cannot 
use this session
                        //to asynchronously receive a message or receive() from 
another thread.
                        //If a message is already pending for this session, we can 
deliver it 
                                                
                        while (true) {
                                
                                if (closed) {
                                        setListening(false);
                                        return null;
                                }
                                
                                if (!session.modeStop) {
                                        Message mes=getMessage();
                                        if (mes!=null) {
                                                setListening(false);
                                                return mes;
                                        }
                                } else Log.log("the connection is stopped !");
                                
                                long att=endTime-((new Date()).getTime());
                                if (att<=0) {
                                        setListening(false);
                                        return null;
                                }
                                
                                try {                                   
                                        waitInReceive=true;
                                        messages.wait(att);
                                } catch (InterruptedException e) {
                                } finally {
                                        waitInReceive=false;
                                }
                                
                        }
                }
        
        }
  
      public Message receiveNoWait() throws JMSException
        {
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");
                if (session.modeStop) return null;
                        
                return session.connection.queueReceiveNoWait(queue);
        }       
  
      public void setMessageListener(MessageListener listener) throws JMSException
        {       
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");                   
                
                messageListener=listener;
                setListening(listener!=null);
        }
        
        //---
        
        void setListening(boolean newvalue) throws JMSException
        {
                if (newvalue==listening) return;
                listening=newvalue;
                if (listening) sessionQueue.changeNumListening(1);
                else sessionQueue.changeNumListening(-1);
        }
  
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyQueueSender.java
  
  Index: SpyQueueSender.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.QueueSender;
  import javax.jms.JMSException;
  import javax.jms.Message;
  import javax.jms.Queue;
  import javax.jms.InvalidDestinationException;
  import java.util.Date;
  
  /**
   *    This class implements javax.jms.QueueSender
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyQueueSender 
        extends SpyMessageProducer 
        implements QueueSender
  {
        // Attributes ----------------------------------------------------
  
        //The session to which this sender is linked
        private SpyQueueSession session;
        //The queue of this sender
        private Queue queue=null;
        
        // Constructor ---------------------------------------------------
           
        SpyQueueSender(SpyQueueSession session,Queue queue)
        {
                this.session=session;
                this.queue=queue;
        }
  
        // Public --------------------------------------------------------
  
      public Queue getQueue() throws JMSException
        {
                return queue;
        }
  
        //Send methods
        
      public void send(Message message) throws JMSException
        {
                if (queue==null) throw new InvalidDestinationException("I do not have 
a default Destination !");
                send(queue,message,defaultDeliveryMode,defaultPriority,defaultTTL);
        }
  
      public void send(Queue queue, Message message) throws JMSException
        {
                send(queue,message,defaultDeliveryMode,defaultPriority,defaultTTL);
        }
  
      public void send(Message message, int deliveryMode, int priority, long 
timeToLive) throws JMSException
        {
                if (queue==null) throw new InvalidDestinationException("I do not have 
a default Destination !");
                send(queue,message,deliveryMode,priority,timeToLive);
        }
  
        public void send(Queue queue, Message mes, int deliveryMode, int priority, 
long timeToLive) throws JMSException
        {
                //We only accept our classes (for now)
                if (!(mes instanceof SpyMessage)) throw new JMSException("I cannot 
deliver this message");
                SpyMessage message=(SpyMessage)mes;
                
                //Set the header fields
                message.jmsDestination=queue;
                message.setJMSDeliveryMode(deliveryMode);
                Date ts=new Date();
                message.setJMSTimestamp(ts.getTime());
                if (timeToLive==0) {
                        message.setJMSExpiration(0);
                } else {
                        message.setJMSExpiration(timeToLive+ts.getTime());
                }
                message.setJMSPriority(priority);
                message.setJMSMessageID(session.getNewMessageID());
                
                //Set the properties and the message body in ReadOnly mode
                //the client has to call clearProperties() and clearBody() if he wants 
to modify those values
                message.setReadOnlyMode();
                
                //This message is not redelivered
                message.setJMSRedelivered(false);
                
                //We must put a 'new message' in the Session's outgoing queue [3.9]    
                         
                session.sendMessage(message.myClone());
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyQueueSession.java
  
  Index: SpyQueueSession.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.QueueSession;
  import javax.jms.Queue;
  import javax.jms.Destination;
  import javax.jms.QueueReceiver;
  import javax.jms.JMSException;
  import javax.jms.QueueSender;
  import javax.jms.TemporaryQueue;
  import javax.jms.QueueBrowser;
  import java.util.HashSet;
  import java.util.HashMap;
  import java.util.Iterator;
  
  /**
   *    This class implements javax.jms.QueueSession
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyQueueSession 
        extends SpySession 
        implements QueueSession
  {
        
        // Constructor ---------------------------------------------------
           
        SpyQueueSession(SpyConnection myConnection, boolean transacted, int 
acknowledgeMode, boolean stop)
        {
                super(myConnection,transacted,acknowledgeMode,stop);
        }
  
        // Public --------------------------------------------------------
  
      public QueueBrowser createBrowser(Queue queue) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
                //Not yet implemented
                return null;
        }
  
        public QueueBrowser createBrowser(Queue queue,String messageSelector) throws 
JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
                //Not yet implemented
                return createBrowser(queue);
        }
  
        public Queue createQueue(String queueName) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
                return ((SpyQueueConnection)connection).createQueue(queueName);
        }
  
      public QueueReceiver createReceiver(Queue queue) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
  
                SpyQueueReceiver receiver=new SpyQueueReceiver(this,queue);
                SessionQueue sessionQueue=addConsumer(queue,receiver);
                
                return receiver;
        }
  
      public QueueReceiver createReceiver(Queue queue, String messageSelector) throws 
JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
                //Not yet implemented
                return createReceiver(queue);
        }
  
      public QueueSender createSender(Queue queue) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
                return new SpyQueueSender(this,queue);
        }
      
        public TemporaryQueue createTemporaryQueue() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
                                                                         
                
                return ((SpyQueueConnection)connection).getTemporaryQueue();
        }
  
        //Not part of the spec
        
        //Called by the ConnectionReceiver object : put a new msg in the receiver's 
queue
        public void dispatchMessage(Destination dest,SpyMessage mes) throws 
JMSException
        {
                //Done in the SessionQueue :)
        }
        
        
        // Package protected ---------------------------------------------
            
        //called by a MessageProducer object which needs to send a message
        void sendMessage(SpyMessage m) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
                //Synchronize with the outgoingQueue
                synchronized (outgoingQueue) 
                {
                        //Test the priority
                        int pri=m.getJMSPriority();
                        
                        if (pri<=4) {
                                
                                //normal priority message
                                outgoingQueue.addLast(m);
                                
                        } else {
                                
                                //expedited priority message
                                int size=outgoingQueue.size();
                                int i=0;
                                
                                for(;i<size;i++) {
                                        if 
(((SpyMessage)outgoingQueue.get(i)).getJMSPriority()<pri) break;
                                }
                                
                                outgoingQueue.add(i,m);
                                
                        }
                        
                }
  
                //Notify the [sleeping ?] thread that there is work to do
                //We should not wait for the lock...
                synchronized (thread)
                {
                        thread.notify();
                }
        }       
        
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpySession.java
  
  Index: SpySession.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.Session;
  import javax.jms.Destination;
  import javax.jms.JMSException;
  import javax.jms.BytesMessage;
  import javax.jms.MapMessage;
  import javax.jms.Message;
  import javax.jms.ObjectMessage;
  import javax.jms.MessageListener;
  import javax.jms.StreamMessage;
  import javax.jms.TextMessage;
  import java.io.Serializable;
  import java.util.LinkedList;
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.Iterator;
  import java.util.Collection;
   
  /**
   *    This class implements javax.jms.Session
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpySession 
        implements Runnable, Session
  {     
        
        // Attributes ----------------------------------------------------
        
        //Is this session transacted ?
        protected boolean transacted;
        //What is the type of acknowledgement ?
        protected int acknowledgeMode;
        //The messageListener for this session
        private MessageListener messageListener;
        //The connection object to which this session is linked
        protected SpyConnection connection;
        //HashMap of SessionQueue by Destination
        public HashMap destinations;
        //The outgoing message queue 
        protected LinkedList outgoingQueue;
        //The outgoing message queue for messages that have been commited (if the 
session is transacted)
        protected LinkedList outgoingCommitedQueue;
        //Is my connection in stopped mode ?
        protected boolean modeStop;
        //Is the session closed ?
        boolean closed;
        //This object is the object used to synchronize the session's thread - Need 
fixed / improvement
        public Integer thread;
        //Is this session in alpha mode ?
        public boolean alphaMode;
  
        // Constructor ---------------------------------------------------             
    
  
        SpySession(SpyConnection conn, boolean trans, int acknowledge, boolean stop)
        { 
                connection=conn;
                transacted=trans;
                acknowledgeMode=acknowledge;
                destinations=new HashMap();
                outgoingQueue=new LinkedList();
                outgoingCommitedQueue=new LinkedList();
                modeStop=stop;
                messageListener=null;
                closed=false;
                thread=new Integer(0);
                alphaMode=true;
                
                //Start my thread 
                Thread oneThread=new Thread(this);
                oneThread.setDaemon(true);
                oneThread.start();
        }
  
        // Public --------------------------------------------------------
        
        public BytesMessage createBytesMessage() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                
                return new SpyBytesMessage();
        } 
   
        public MapMessage createMapMessage() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                         
                return new SpyMapMessage();
        }
  
        public Message createMessage() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                         
                return new SpyMessage();
        }
  
        public ObjectMessage createObjectMessage() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                         
                return new SpyObjectMessage();
        } 
  
        public ObjectMessage createObjectMessage(Serializable object) throws 
JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                
                ObjectMessage msg=new SpyObjectMessage();
                msg.setObject(object);
                return msg;
        }
  
        public StreamMessage createStreamMessage() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                         
                return new SpyStreamMessage();
        }
  
        public TextMessage createTextMessage() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                         
                return new SpyTextMessage();
        }
  
        public TextMessage createTextMessage(StringBuffer stringBuffer) throws 
JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                         
                TextMessage msg=new SpyTextMessage();
                msg.setText(stringBuffer.toString());
                return msg;
        }
  
        public boolean getTransacted() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                         
                return transacted;
        }
  
  
        public MessageListener getMessageListener() throws JMSException
        {               
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                         
                return messageListener;
        }
  
        public void setMessageListener(MessageListener listener) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                         
                messageListener=listener;
        }
        
        //The thread for this session. It sends outgoing messages and delivers 
incoming ones
        public void run()
        {
                Log.log("Hi ! I'm a session thread :)");
                
                while (true) {
  
                        synchronized (thread) {
                                boolean doneJob=false;                          
                                
                                if (closed) return;
                                
                                //look at outgoing queues
                                
                                SpyMessage outgoingJob[]=null;
                                
                                if (transacted) {               
                                        synchronized (outgoingCommitedQueue) {
                                                //The session is transacted, we take 
the outgoing msgs from outgoingCommitedQueue
                                                if (outgoingCommitedQueue.size()!=0) {
                                                        SpyMessage array[]=new 
SpyMessage[outgoingCommitedQueue.size()];
                                                        
outgoingJob=(SpyMessage[])outgoingCommitedQueue.toArray(array);
                                                        outgoingCommitedQueue.clear();
                                                }                                      
                                         
                                        }
                                } else {
                                        synchronized (outgoingQueue) {
                                                //The session is not transacted, we 
take the outgoing msgs from outgoingQueue
                                                if (outgoingQueue.size()!=0) {
                                                        SpyMessage array[]=new 
SpyMessage[outgoingQueue.size()];
                                                        
outgoingJob=(SpyMessage[])outgoingQueue.toArray(array);
                                                        outgoingQueue.clear();
                                                }
                                        }
                                }
                                
                                if (outgoingJob!=null) {                               
         
                                        try {
                                                //Check for outdated messages !
                                                connection.sendToServer(outgoingJob);
                                                doneJob=true;
                                        } catch (JMSException e) {
                                                Log.log("Cannot send 
"+outgoingJob.toString()+" to the provider...");
                                                Log.error(e);
                                        }
                                }
                                
                                //if we are not in stopped mode, look at the incoming 
queue                             
                                
                                if (!modeStop) {
                                                
                                        Collection values = destinations.values();
                                        Iterator i=values.iterator();
                                        while (i.hasNext()) {
                                                SessionQueue 
sessionQueue=(SessionQueue)i.next();
                                                
doneJob=doneJob||sessionQueue.deliverMessage();
                                        }
                                        
                                }
                                        
                                //If there was smthg to do, try again
                                if (doneJob) continue;
                                        
                                try {
                                        Log.log("SessionThread: I'm going to bed...");
                                        thread.wait();
                                        Log.log("SessionThread: I wake up");
                                } catch (InterruptedException e) {                     
                
                                }
  
                        }
                }
        }
  
        public synchronized void close() throws JMSException
        {
                if (closed) return;
                closed=true;
  
                //if the thread is sleeping, kill it
                synchronized (thread) {
                        thread.notify();
                }
                
                //notify the sleeping synchronous listeners
                
                Collection values = destinations.values();
                Iterator i=values.iterator();
                while (i.hasNext()) {
                        SessionQueue sessionQueue=(SessionQueue)i.next();
                        sessionQueue.close();
                }       
                
                connection.sessionClosing(this);
                
        }
  
        public void dispatchMessage(Destination dest, SpyMessage mes) throws 
JMSException
        {
                //The job is done in inherited classes
                throw new RuntimeException("pure virtual call");
        }       
        
        //Commit a transacted session
        public synchronized void commit() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                if (!transacted) throw new IllegalStateException("The session is not 
transacted");
  
                Log.log("Session: commit()");
  
                boolean modeSav=modeStop;
                modeStop=true;
                
                //Wait for the thread to sleep
                synchronized (thread) {
                        
                        //Move the outgoing messages from the outgoingQueue to the 
outgoingCommitedQueue
                        outgoingCommitedQueue.addAll(outgoingQueue);
                        outgoingQueue.clear();
                        
                        //Notify each SessionQueue that we are going to commit
                        Collection values = destinations.values();
                        Iterator i=values.iterator();
                        while (i.hasNext()) {
                                SessionQueue sessionQueue=(SessionQueue)i.next();
                                sessionQueue.commit();
                        }       
                        
                        //We have finished our work, we can wake up the thread
                        modeStop=modeSav;
                        thread.notify();
                }
                
        }
  
        //Rollback a transacted session
        public synchronized void rollback() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                if (!transacted) throw new IllegalStateException("The session is not 
transacted");
                                                                         
                Log.log("Session: rollback()");
  
                boolean modeSav=modeStop;
                modeStop=true;
                
                //Wait for the thread to sleep
                synchronized (thread) {
                        
                        //Clear the outgoing queue
                        outgoingQueue.clear();
                        
                        //Notify each SessionQueue that we are going to rollback
                        Collection values = destinations.values();
                        Iterator i=values.iterator();
                        while (i.hasNext()) {
                                SessionQueue sessionQueue=(SessionQueue)i.next();
                                sessionQueue.recover();
                        }       
                        
                        //We have finished our work, we can wake up the thread
                        modeStop=modeSav;
                        thread.notify();
                }
        }
  
        public synchronized void recover() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");
                if (transacted) throw new IllegalStateException("The session is 
transacted");
                                                                         
                Log.log("Session: recover()");
  
                boolean modeSav=modeStop;
                modeStop=true;
                
                //Wait for the thread to sleep
                synchronized (thread) {
                        
                        //Notify each SessionQueue that we are going to recover
                        Collection values = destinations.values();
                        Iterator i=values.iterator();
                        while (i.hasNext()) {
                                SessionQueue sessionQueue=(SessionQueue)i.next();
                                sessionQueue.recover();
                        }       
                        
                        //We have finished our work, we can wake up the thread
                        modeStop=modeSav;
                        thread.notify();
                }
                
                
        }
  
        public void deleteTemporaryDestination(SpyDestination dest) throws JMSException
        {       
                Log.log("SpySession: deleteDestination(dest="+dest.toString()+")");
                
                //Remove it from the subscribers list
                synchronized (destinations) {
                        HashMap newMap=(HashMap)destinations.clone();   
                        newMap.remove(dest);
                        destinations=newMap;
                }
                
                //We could look at our incoming and outgoing queues to drop messages
        }
        
        // Package protected ---------------------------------------------
            
        SessionQueue addConsumer(Destination dest, SpyMessageConsumer who) throws 
JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");
                                                                                
                Log.log("Session: 
subscribe(dest="+dest.toString()+",MessageConsumer="+who.toString()+")");
                
                synchronized (destinations) {
                        SessionQueue sub=(SessionQueue)destinations.get(dest);
                        if (sub==null) {
                                sub=new SessionQueue(this,dest);
                                sub.addConsumer(who);
                                HashMap newDestinations=(HashMap)destinations.clone();
                                newDestinations.put(dest,sub);
                                destinations=newDestinations;
                                connection.addSession(dest,this);
                        } else {
                                sub.addConsumer(who);
                        }               
                        return sub;
                }
        }
  
        void removeConsumer(Destination dest, SpyMessageConsumer who) throws 
JMSException
        {
                Log.log("Session: 
removeConsumer(Destination="+dest.toString()+",MessageConsumer="+who.toString()+")");
                
                synchronized (destinations) {
                        SessionQueue sub=(SessionQueue)destinations.get(dest);
                        if (sub!=null) {                                               
                 
                                boolean empty=sub.removeConsumer(who);
                                if (empty) {
                                        HashMap 
newDestinations=(HashMap)destinations.clone();
                                        newDestinations.remove(dest);
                                        destinations=newDestinations;
                                        connection.removeSession(dest,this);
                                } 
                        } else {
                                //this should not happen
                                HashMap newDestinations=(HashMap)destinations.clone();
                                newDestinations.remove(dest);
                                destinations=newDestinations;
                        }
                }                       
                
        }
                
        String getNewMessageID() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                       
 
                return connection.getNewMessageID();
        }
        
        //The connection has changed its mode (stop() or start())
        //We have to wait until message delivery has stopped or wake up the thread
        void notifyStopMode(boolean newValue)
          {
                
                if (closed) throw new IllegalStateException("The session is closed");  
                                                                  
                if (modeStop==newValue) return; 
                
                modeStop=newValue;
                
                if (modeStop) {
                        
                        //Wait for the thread to sleep
                        synchronized (thread) {
                                ;
                        }
                                                
                } else {
                        
                        //Wake up the thread
                        synchronized (thread) {
                                thread.notify();
                        }
                        
                }
                
        }
        
        
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyStreamMessage.java
  
  Index: SpyStreamMessage.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.StreamMessage;
  import javax.jms.JMSException;
  import javax.jms.MessageEOFException;
  import javax.jms.MessageNotWriteableException;
  import javax.jms.MessageFormatException;
  import java.util.Vector;
  
  /**
   *    This class implements javax.jms.StreamMessage
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyStreamMessage 
        extends SpyMessage 
        implements StreamMessage, Cloneable
  {
        // Attributes ----------------------------------------------------
  
        private Vector content;
        private int position;
        private int offset;
        private int size;
  
        // Constructor ---------------------------------------------------
           
        SpyStreamMessage()
        {
                msgReadOnly=false;
                content=new Vector();
                position=0;
                size=0;
                offset=0;
        }
  
        // Public --------------------------------------------------------
  
        public boolean readBoolean() throws JMSException
        {
                if (!msgReadOnly) throw new MessageNotWriteableException("The message 
body is writeonly");
                                
                try {
                        Object value=content.get(position);
                        position++;
                        offset=0;
                        
                        if (value instanceof Boolean) return 
((Boolean)value).booleanValue();
                        else if (value instanceof String) return 
Boolean.getBoolean((String)value);
                        else throw new MessageFormatException("Invalid conversion");
                
                } catch (ArrayIndexOutOfBoundsException e) {
                        throw new MessageEOFException("");
                }
                
        }
  
      public byte readByte() throws JMSException
        {
                if (!msgReadOnly) throw new MessageNotWriteableException("The message 
body is writeonly");
  
                try {
                        Object value=content.get(position);
                        position++;
                        offset=0;
                        if (value instanceof Byte) return ((Byte)value).byteValue();
                        else if (value instanceof String) return 
Byte.parseByte((String)value);
                        else throw new MessageFormatException("Invalid conversion");   
                 
                        
                } catch (ArrayIndexOutOfBoundsException e) {
                        throw new MessageEOFException("");
                }
        }
  
      public short readShort() throws JMSException
        {
                if (!msgReadOnly) throw new MessageNotWriteableException("The message 
body is writeonly");
                try {
                        Object value=content.get(position);
                        position++;
                        offset=0;
                        
                        if (value instanceof Byte) return ((Byte)value).shortValue();
                        else if (value instanceof Short) return 
((Short)value).shortValue();
                        else if (value instanceof String) return 
Short.parseShort((String)value);
                        else throw new MessageFormatException("Invalid conversion");
                        
                } catch (ArrayIndexOutOfBoundsException e) {
                        throw new MessageEOFException("");
                }
        }
  
      public char readChar() throws JMSException
        {
                if (!msgReadOnly) throw new MessageNotWriteableException("The message 
body is writeonly");
                try {
                        Object value=content.get(position);
                        position++;
                        offset=0;
  
                        if (value instanceof Character) return 
((Character)value).charValue();
                        else throw new MessageFormatException("Invalid conversion");   
                 
                        
                } catch (ArrayIndexOutOfBoundsException e) {
                        throw new MessageEOFException("");
                }
        }
  
      public int readInt() throws JMSException
        {
                if (!msgReadOnly) throw new MessageNotWriteableException("The message 
body is writeonly");
  
                try {
                        Object value=content.get(position);
                        position++;
                        offset=0;
  
                        if (value instanceof Byte) return ((Byte)value).intValue();
                        else if (value instanceof Short) return 
((Short)value).intValue();
                        else if (value instanceof Integer) return 
((Integer)value).intValue();
                        else if (value instanceof String) return 
Integer.parseInt((String)value);
                        else throw new MessageFormatException("Invalid conversion");   
                 
                        
                } catch (ArrayIndexOutOfBoundsException e) {
                        throw new MessageEOFException("");
                }
        }
  
      public long readLong() throws JMSException
        {
                if (!msgReadOnly) throw new MessageNotWriteableException("The message 
body is writeonly");
                try {
                        Object value=content.get(position);
                        position++;
                        offset=0;
                        
                        if (value instanceof Byte) return ((Byte)value).longValue();
                        else if (value instanceof Short) return 
((Short)value).longValue();
                        else if (value instanceof Integer) return 
((Integer)value).longValue();
                        else if (value instanceof Long) return 
((Long)value).longValue();               
                        else if (value instanceof String) return 
Long.parseLong((String)value);
                        else throw new MessageFormatException("Invalid conversion");   
                 
                        
                } catch (ArrayIndexOutOfBoundsException e) {
                        throw new MessageEOFException("");
                }
        }
  
      public float readFloat() throws JMSException
        {
                if (!msgReadOnly) throw new MessageNotWriteableException("The message 
body is writeonly");
                try {
                        Object value=content.get(position);
                        position++;
                        offset=0;
  
                        if (value instanceof Float) return ((Float)value).floatValue();
                        else if (value instanceof String) return 
Float.parseFloat((String)value);
                        else throw new MessageFormatException("Invalid conversion");   
                 
                        
                } catch (ArrayIndexOutOfBoundsException e) {
                        throw new MessageEOFException("");
                }
        }
  
      public double readDouble() throws JMSException
        {
                if (!msgReadOnly) throw new MessageNotWriteableException("The message 
body is writeonly");
                try {
                        Object value=content.get(position);
                        position++;
                        offset=0;
                        
                        if (value instanceof Float) return 
((Float)value).doubleValue();
                        else if (value instanceof Double) return 
((Double)value).doubleValue();
                        else if (value instanceof String) return 
Double.parseDouble((String)value);
                        else throw new MessageFormatException("Invalid conversion");   
                 
                        
                } catch (ArrayIndexOutOfBoundsException e) {
                        throw new MessageEOFException("");
                }
        }
  
      public String readString() throws JMSException
        {
                if (!msgReadOnly) throw new MessageNotWriteableException("The message 
body is writeonly");
                try {
                        Object value=content.get(position);
                        position++;
                        offset=0;
                        
                        if (value instanceof Boolean) return 
((Boolean)value).toString();
                        else if (value instanceof Byte) return 
((Byte)value).toString();
                        else if (value instanceof Short) return 
((Short)value).toString();
                        else if (value instanceof Character) return 
((Character)value).toString();
                        else if (value instanceof Integer) return 
((Integer)value).toString();
                        else if (value instanceof Long) return 
((Long)value).toString();
                        else if (value instanceof Float) return 
((Float)value).toString();
                        else if (value instanceof Double) return 
((Double)value).toString();
                        else if (value instanceof String) return (String)value;
                        else throw new MessageFormatException("Invalid conversion");   
                 
                        
                } catch (ArrayIndexOutOfBoundsException e) {
                        throw new MessageEOFException("");
                }
        }
  
      public int readBytes(byte[] value) throws JMSException
        {
                if (!msgReadOnly) throw new MessageNotWriteableException("The message 
body is writeonly");
                try {
                        Object myObj=content.get(position);
                        if (!(myObj instanceof byte[])) throw new 
MessageFormatException("Invalid conversion");
                        byte[] obj=(byte[])myObj;
                        
                        if (obj.length==0) {
                                position++;
                                offset=0;                                       
                                return 0;                                       
                        }
                        
                        if (offset>=obj.length) return -1;
                        
                        if (obj.length-offset<value.length) {                          
         
                                        
                                for(int i=0;i<obj.length;i++) value[i]=obj[i+offset];  
                                 
                                        
                                position++;
                                offset=0;
                                        
                                return obj.length-offset;
                        } else {                                                       
                         
                                for(int i=0;i<value.length;i++) value[i]=obj[i+offset];
                                offset+=value.length;
                                        
                                return value.length;
                        }                               
                                        
                } catch (ArrayIndexOutOfBoundsException e) {
                        throw new MessageEOFException("");
                }
        }
  
      public Object readObject() throws JMSException
        {
                if (!msgReadOnly) throw new MessageNotWriteableException("The message 
body is writeonly");
                try {
                        Object value=content.get(position);
                        position++;
                        offset=0;
                        
                        return value;
                        
                } catch (ArrayIndexOutOfBoundsException e) {
                        throw new MessageEOFException("");
                }
        }
  
      public void writeBoolean(boolean value) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("The message 
body is readonly");
                content.add(new Boolean(value));
        }
  
      public void writeByte(byte value) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("The message 
body is readonly");
                content.add(new Byte(value));
        }
  
      public void writeShort(short value) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("The message 
body is readonly");
                content.add(new Short(value));
        }
  
      public void writeChar(char value) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("The message 
body is readonly");
                content.add(new Character(value));
        }
  
      public void writeInt(int value) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("The message 
body is readonly");
                content.add(new Integer(value));
        }
  
      public void writeLong(long value) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("The message 
body is readonly");
                content.add(new Long(value));
        }
        public void writeFloat(float value) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("The message 
body is readonly");
                content.add(new Float(value));
        }
  
      public void writeDouble(double value) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("The message 
body is readonly");
                content.add(new Double(value));
        }
  
      public void writeString(String value) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("The message 
body is readonly");
                content.add(new String(value));
        }
  
      public void writeBytes(byte[] value) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("The message 
body is readonly");
                content.add(value.clone());
        }
   
      public void writeBytes(byte[] value, int offset, int length) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("The message 
body is readonly");
                
                if (offset+length>value.length) throw new JMSException("Array is too 
small");           
                byte[] temp = new byte[length];
                for(int i=0;i<length;i++) 
                        temp[i]=value[i+offset];
                
                content.add(temp);
        }
  
      public void writeObject(Object value) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("The message 
body is readonly");
                if (value instanceof Boolean) content.add(value);
                else if (value instanceof Byte) content.add(value);
                else if (value instanceof Short) content.add(value);
                else if (value instanceof Character) content.add(value);
                else if (value instanceof Integer) content.add(value);
                else if (value instanceof Long) content.add(value);
                else if (value instanceof Float) content.add(value);
                else if (value instanceof Double) content.add(value);
                else if (value instanceof String) content.add(value);
                else if (value instanceof byte[]) content.add(((byte[])value).clone());
                else throw new MessageFormatException("Invalid object type");          
 
        }
   
      public void reset() throws JMSException
        {
                msgReadOnly=true;
                position=0;
                size=content.size();
                offset=0;
        }
        
      public void clearBody() throws JMSException
        {
                content=new Vector();
                position=0;
                offset=0;
                size=0;
  
                super.clearBody();                      
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyTemporaryQueue.java
  
  Index: SpyTemporaryQueue.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.TemporaryQueue;
  import javax.jms.JMSException;
  
  /**
   *    This class implements javax.jms.TemporaryQueue
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyTemporaryQueue 
        extends SpyQueue 
        implements TemporaryQueue
  {
  
        //The DistributedConnection of its creator
        SpyDistributedConnection dc;
        
        // Constructor ---------------------------------------------------
           
        SpyTemporaryQueue(String queueName,SpyDistributedConnection dc_)
        {
                super(queueName);
                dc=dc_;
        }
  
        // Public --------------------------------------------------------
  
      public void delete() throws JMSException
        {
                try {
                        dc.cr.deleteTemporaryDestination(this);
                } catch (Exception e) {
                        Log.error(e);
                        throw new JMSException("Cannot delete the TemporaryQueue");
                }
        }
        
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyTemporaryTopic.java
  
  Index: SpyTemporaryTopic.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.TemporaryTopic;
  import javax.jms.JMSException;
  
  /**
   *    This class implements javax.jms.TemporaryTopic
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyTemporaryTopic 
        extends SpyTopic 
        implements TemporaryTopic
  {
        
        //The DistributedConnection of its creator
        SpyDistributedConnection dc;
        
        // Constructor ---------------------------------------------------
           
        SpyTemporaryTopic(String topicName, SpyDistributedConnection dc_)
        {
                super(topicName);
                dc=dc_;
        }
  
        // Public --------------------------------------------------------
  
      public void delete() throws JMSException
        {
                try {
                        dc.cr.deleteTemporaryDestination(this);
                } catch (Exception e) {
                        Log.error(e);
                        throw new JMSException("Cannot delete the TemporaryTopic");
                }
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyTextMessage.java
  
  Index: SpyTextMessage.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.TextMessage;
  import javax.jms.JMSException;
  import javax.jms.MessageNotWriteableException;
  
  /**
   *    This class implements javax.jms.TextMessage
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyTextMessage 
        extends SpyMessage 
        implements Cloneable,TextMessage
  { 
  
        // Attributes ----------------------------------------------------
  
        private String content=null;
        
        // Public --------------------------------------------------------
  
        public void setText(String string) throws JMSException
        {
                if (msgReadOnly) throw new MessageNotWriteableException("Cannot set 
the content");
                content=string;
        }
  
      public String getText() throws JMSException
        {
                return content;
        }
        
      public void clearBody() throws JMSException
        {
                content=null;
                super.clearBody();                      
        }
        
        
        // Object override -----------------------------------------------
        
        public String toString()
        {
                try {
                        return "TextMessage@"+getText();
                } catch (JMSException e) {
                        return "toString() failed !";
                }               
        }
  
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyTopic.java
  
  Index: SpyTopic.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.Topic;
  import javax.jms.JMSException;
  import java.io.Serializable;
  
  /**
   *    This class implements javax.jms.Topic
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyTopic 
        extends SpyDestination 
        implements Topic, Serializable
  {
        
        // Constructor ---------------------------------------------------
           
        SpyTopic(String topicName)
        {
                super();
                name=topicName;
        }
  
        // Public --------------------------------------------------------
  
        public String getTopicName() throws JMSException
        {
                return name;
        }
        
        public String toString()
        {
                return "Topic@"+name;
        }
        
        // Object override -----------------------------------------------
  
        //A topic is identified by its name
        public boolean equals(Object obj)
        {
                if (obj instanceof SpyTopic)
                        return ((SpyDestination)obj).name.equals(name);
                return false;
        }
  
        public int hashCode()
        {
                return name.hashCode();
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyTopicConnection.java
  
  Index: SpyTopicConnection.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.TopicConnection;
  import javax.jms.JMSException;
  import javax.jms.TopicSession;
  import javax.jms.TemporaryTopic;
  import javax.jms.ConnectionConsumer;
  import javax.jms.ServerSessionPool;
  import javax.jms.Topic;
  import java.io.Serializable;
  import org.spydermq.distributed.interfaces.DistributedJMSServer;
  
  /**
   *    This class implements javax.jms.TopicConnection
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyTopicConnection 
        extends SpyConnection 
        implements Serializable, TopicConnection 
  {
  
        // Constants -----------------------------------------------------
            
        // Attributes ----------------------------------------------------
  
        // Constructor ---------------------------------------------------
           
        public SpyTopicConnection(DistributedJMSServer theServer,String cID,String 
crCN) throws JMSException
        { 
                super(theServer,cID,crCN);
        } 
  
        // Public --------------------------------------------------------
        
      public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) 
throws JMSException
        {               
                if (closed) throw new IllegalStateException("The connection is 
closed");                
                if (distributedConnection==null) createReceiver();
                                
                TopicSession session=new 
SpyTopicSession(this,transacted,acknowledgeMode,modeStop);
                
                //add the new session to the createdSessions list 
                synchronized (createdSessions) {
                        createdSessions.add(session);
                }
                
                return session;
        }
  
        public ConnectionConsumer createConnectionConsumer(Topic topic,
                                                        String messageSelector,
                                                        ServerSessionPool sessionPool,
                                                        int maxMessages) throws 
JMSException
        {
                if (closed) throw new IllegalStateException("The connection is 
closed");                
                if (distributedConnection==null) createReceiver();
                                                                                
                //Not impelemted yet
                return null;
        }
  
      public ConnectionConsumer createDurableConnectionConsumer(Topic topic, 
                                                        String messageSelector,
                              ServerSessionPool sessionPool, 
                                                        int maxMessages) throws 
JMSException
        {
                if (closed) throw new IllegalStateException("The connection is 
closed");                
                if (distributedConnection==null) createReceiver();
                                                                                
                //Not impelemted yet
                return null;
        }
  
        // Package protected ---------------------------------------------
            
        void sendToServer(SpyMessage[] c) throws JMSException
        {
                Log.log("Connection: sendToServer("+c.length+" msgs)");
                super.sendToServer(c);
        }
  
        TemporaryTopic getTemporaryTopic() throws JMSException
        {
                if (closed) throw new IllegalStateException("The connection is 
closed");
                if (distributedConnection==null) createReceiver();
  
                try {
                        return provider.getTemporaryTopic(distributedConnection);
                } catch (Exception e) {
                        failureHandler(e,"Cannot create a temporary topic !");
                        return null;
                }
        }
        
        Topic createTopic(String name) throws JMSException
        {               
                try {
                        if (closed) throw new IllegalStateException("The connection is 
closed");        
                        if (distributedConnection==null) createReceiver();             
         
                        return provider.createTopic(name);
                } catch (Exception e) {
                        failureHandler(e,"Cannot get the topic from the provider");
                        return null;
                }
        }
        
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyTopicPublisher.java
  
  Index: SpyTopicPublisher.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.TopicPublisher;
  import javax.jms.JMSException;
  import javax.jms.Message;
  import javax.jms.Topic;
  import javax.jms.InvalidDestinationException;
  import java.util.Date;
  
  /**
   *    This class implements javax.jms.TopicPublisher
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyTopicPublisher 
        extends SpyMessageProducer 
        implements TopicPublisher
  {
        // Attributes ----------------------------------------------------
  
        //The session to which this publisher is linked
        private SpyTopicSession mySession;
        //The topic of this publisher
        private Topic myTopic=null;
        
        // Constructor ---------------------------------------------------
           
        SpyTopicPublisher(SpyTopicSession s,Topic t)
        {
                mySession=s;
                myTopic=t;
        }
  
        // Public --------------------------------------------------------
  
        public Topic getTopic() throws JMSException
        {
                return myTopic;
        }
  
        //Publish methods
        
        public void publish(Message message) throws JMSException
        {
                if (myTopic==null) throw new InvalidDestinationException("I do not 
have a default Destination !");
                
publish(myTopic,message,defaultDeliveryMode,defaultPriority,defaultTTL);
        }
        
        public void publish(Topic topic, Message message) throws JMSException
        {
                publish(topic,message,defaultDeliveryMode,defaultPriority,defaultTTL);
        }
        
        public void publish(Message message, int deliveryMode, int priority, long 
timeToLive) throws JMSException
        {
                if (myTopic==null) throw new InvalidDestinationException("Destination 
is null !");
                publish(myTopic,message,deliveryMode,priority,timeToLive);
        }
  
        public void publish(Topic topic, Message mes, int deliveryMode, int priority, 
long timeToLive) throws JMSException
        {
                //We only accept our classes (for now)
                if (!(mes instanceof SpyMessage)) throw new JMSException("I cannot 
deliver this message");
                SpyMessage message=(SpyMessage)mes;
                
                //Set the header fields
                message.jmsDestination=topic;
                message.setJMSDeliveryMode(deliveryMode);
                Date ts=new Date();
                message.setJMSTimestamp(ts.getTime());
                if (timeToLive==0) {
                        message.setJMSExpiration(0);
                } else {
                        message.setJMSExpiration(timeToLive+ts.getTime());
                }
                message.setJMSPriority(priority);
                message.setJMSMessageID(mySession.getNewMessageID());
                
                //Set the properties and the message body in ReadOnly mode
                //the client has to call clearProperties() and clearBody() if he wants 
to modify those values
                message.setReadOnlyMode();
                
                //This message is not redelivered
                message.setJMSRedelivered(false);               
  
                //We must put a 'new message' in the Session's outgoing queue [3.9]    
                         
                mySession.sendMessage(message.myClone());
        }
                
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyTopicSession.java
  
  Index: SpyTopicSession.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.TopicSession;
  import javax.jms.Topic;
  import javax.jms.Destination;
  import javax.jms.TopicSubscriber;
  import javax.jms.JMSException;
  import javax.jms.TopicPublisher;
  import javax.jms.TemporaryTopic;
  import java.util.Collection;
  import java.util.HashSet;
  import java.util.HashMap;
  import java.util.Iterator;
  import org.spydermq.selectors.Selector; 
  
  /**
   *    This class implements javax.jms.TopicSession
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyTopicSession 
        extends SpySession 
        implements TopicSession
  {
        
        // Constructor ---------------------------------------------------
           
        SpyTopicSession(SpyConnection myConnection, boolean transacted, int 
acknowledgeMode, boolean stop)
        {
                super(myConnection,transacted,acknowledgeMode,stop);
        }
  
        // Public --------------------------------------------------------
  
      public Topic createTopic(String topicName) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
  
                return ((SpyTopicConnection)connection).createTopic(topicName);
        }
  
      public TopicSubscriber createSubscriber(Topic topic) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
                return createSubscriber(topic,null,false);
        }
  
      public TopicSubscriber createSubscriber(Topic topic, String messageSelector, 
boolean noLocal) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                
                SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,noLocal);
                SessionQueue sessionQueue=addConsumer(topic,sub);
                
                if (messageSelector!=null) {
                        Selector selector=new Selector(messageSelector);        
                        sub.setSelector(selector,messageSelector);
                }
  
                return sub;
        }
  
      public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws 
JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
                //Not yet implemented
                return createSubscriber(topic);
        }
  
      public TopicSubscriber createDurableSubscriber(Topic topic, String name, String 
messageSelector, boolean noLocal) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
                //Not yet implemented
                return createSubscriber(topic);
        }
  
      public TopicPublisher createPublisher(Topic topic) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
                return new SpyTopicPublisher(this,topic);
        }
      
        public TemporaryTopic createTemporaryTopic() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
                                                                         
                return ((SpyTopicConnection)connection).getTemporaryTopic();
        }
  
      public void unsubscribe(String name) throws JMSException
        {
                //Not yet implemented
        }
        
        
        // - Package protected ---------------------------------------------
        // - Not part of the spec
        
        //Called by the ConnectionReceiver object : put a new msg in the receiver's 
queue
        public void dispatchMessage(Destination dest,SpyMessage mes) throws 
JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
                Log.log("Session: 
dispatchMessage(Destination="+dest.toString()+",Mes="+mes.toString()+")");
                
                if (mes.isOutdated()) return;
  
                //Get the SessionQueue for this Destination
                SessionQueue sessionQueue=(SessionQueue)destinations.get(dest);
                if (sessionQueue==null) return;
                
                //Work on the set of SpyTopicSubscriber for this topic          
                Iterator i=sessionQueue.subscribers.iterator();                        
 
                while (i.hasNext()) {   
                        SpyTopicSubscriber sub=(SpyTopicSubscriber)i.next();
                        sub.addMessage(mes);
                }
                
        }
        
            
        //called by a MessageProducer object which needs to publish a message
        void sendMessage(SpyMessage m) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
                //Synchronize with the outgoingQueue
                synchronized (outgoingQueue) 
                {
                        //Test the priority
                        int pri=m.getJMSPriority();
                        
                        if (pri<=4) {
                                
                                //normal priority message
                                outgoingQueue.addLast(m);
                                
                        } else {
                                
                                //expedited priority message
                                int size=outgoingQueue.size();
                                int i=0;
                                
                                for(;i<size;i++) {
                                        if 
(((SpyMessage)outgoingQueue.get(i)).getJMSPriority()<pri) break;
                                }
                                
                                outgoingQueue.add(i,m);
                                
                        }
                        
                }
  
                //notify the thread that there is work to do
                //we should change this...
                synchronized (thread)
                {
                        thread.notify();
                }
  
        }
        
        
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java
  
  Index: SpyTopicSubscriber.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.TopicSubscriber;
  import javax.jms.JMSException;
  import javax.jms.Topic;
  import javax.jms.Message;
  import javax.jms.MessageListener;
  import java.util.Date;
  
  import org.spydermq.selectors.Selector;
  
  /**
   *    This class implements javax.jms.TopicSubscriber
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyTopicSubscriber 
        extends SpyMessageConsumer 
        implements TopicSubscriber
  {
        // Attributes ----------------------------------------------------
  
        //The topic I registered
        private Topic topic;
        //Am I in local mode ?
        boolean local;
  
        // Constructor ---------------------------------------------------
           
      SpyTopicSubscriber(SpyTopicSession session,Topic topic,boolean local) 
        {
                super(session);
                this.topic=topic;
                this.local=local;
        }
  
        // Public --------------------------------------------------------
  
        public Topic getTopic() throws JMSException
        {
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");                                   
                return topic;
        }
  
      public boolean getNoLocal() throws JMSException
        {
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");                   
                return local;
        }
        
        //Overrides MessageConsumer
  
        public void close() throws JMSException
        {               
                if (closed) return;
                closed=true;
  
                session.removeConsumer(topic,this);
                
                if (waitInReceive&&messageListener==null) {
                        
                        //A consumer could be waiting in receive()
                        synchronized (messages) {
                                messages.notify();
                        }
                        
                }
        }
                
      public Message receive() throws JMSException
        {
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
                
                synchronized (messages) {
                        
                        //if the client follows the specification [4.4.6], he cannot 
use this session
                        //to asynchronously receive a message or receive() in another 
thread.
                        //If a message is already pending for this session, we can 
immediatly deliver it 
                        
                        while (true) {
                                
                                if (closed) return null;
                                
                                if (!session.modeStop) {
                                        Message mes=getMessage();
                                        if (mes!=null) return mes;
                                } else Log.notice("the connection is stopped !");
                                
                                try {
                                        waitInReceive=true;
                                        messages.wait();
                                } catch (InterruptedException e) {
                                } finally {
                                        waitInReceive=false;
                                }
                                
                        }
                }
        }
  
      public Message receive(long timeOut) throws JMSException
        {
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
                
                if (timeOut==0) return receive();
                
                long endTime=(new Date()).getTime()+timeOut;
                
                synchronized (messages) {
                        
                        //if the client respects the specification [4.4.6], he cannot 
use this session
                        //to asynchronously receive a message or receive() from 
another thread.
                        //If a message is already pending for this session, we can 
deliver it 
                                                
                        while (true) {
                                
                                if (closed) return null;
                                
                                if (!session.modeStop) {
                                        Message mes=getMessage();
                                        if (mes!=null) return mes;
                                } else Log.notice("the connection is stopped !");
                                
                                long att=endTime-((new Date()).getTime());
                                if (att<=0) return null;
                                
                                try {                                   
                                        waitInReceive=true;
                                        messages.wait(att);
                                } catch (InterruptedException e) {
                                } finally {
                                        waitInReceive=false;
                                }
                                
                        }
                }
                
        }
  
      public Message receiveNoWait() throws JMSException
        {
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
                
                synchronized (messages) {
                        
                        while (true) {
                                if (session.modeStop) return null;
                                return getMessage();
                        }
                                                
                }
        }
  
      public void setMessageListener(MessageListener listener) throws JMSException
        {       
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
                
                messageListener=listener;
                
                //Signal the change to the session thread ( it could sleep, while 
there are messages for him )
                synchronized (session.thread) {
                        session.thread.notify();
                }
        }
        
  }
  
  
  

Reply via email to