User: hiram   
  Date: 00/12/23 07:48:27

  Modified:    src/java/org/spydermq/server JMSServer.java
                        PersistenceManager.java SpyderMQService.java
                        StartServer.java
  Added:       src/java/org/spydermq/server AbstractQueue.java
                        ClientConsumer.java ExclusiveQueue.java
                        JMSDestination.java SharedQueue.java Task.java
  Removed:     src/java/org/spydermq/server JMSServerQueue.java
                        JMSServerQueueReceiver.java
  Log:
  These changes were done to add the following features:
  The selector is now evaluated at the server side.
  The infrastructure has been laid for durable topic subscriptions.
  The QueueBrowser has been implemented.
  Queues now can have a Selector.
  
  Revision  Changes    Path
  1.5       +226 -183  spyderMQ/src/java/org/spydermq/server/JMSServer.java
  
  Index: JMSServer.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSServer.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- JMSServer.java    2000/12/21 22:33:59     1.4
  +++ JMSServer.java    2000/12/23 15:48:24     1.5
  @@ -19,6 +19,7 @@
   
   import org.spydermq.*;
   import org.spydermq.security.SecurityManager;
  +import org.spydermq.xml.XElement;
   
   /**
    *   This class implements the JMS provider
  @@ -26,7 +27,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.4 $
  + *   @version $Revision: 1.5 $
    */
   public class JMSServer 
                implements Runnable, JMSServerMBean
  @@ -65,10 +66,15 @@
         */
        private boolean stopped = true;
   
  +
  +     //The list of ClientConsumers hased by SpyDistributedConnections
  +     HashMap clientConsumers = new HashMap();
  +     XElement serverConfig;
  +
        /////////////////////////////////////////////////////////////////////
        // Constructors
        /////////////////////////////////////////////////////////////////////   
  -     public JMSServer(SecurityManager securityManager)
  +     public JMSServer()
        {
   
                taskQueue=new LinkedList();
  @@ -76,7 +82,7 @@
                
                for(int i=0;i<NB_THREADS;i++) 
                {
  -                     Thread oneThread=new Thread(this);
  +                     Thread oneThread=new Thread(this,"JMSServer");
                        oneThread.setDaemon(true);
                        oneThread.setName(new Integer(i).toString());
                        oneThread.start();
  @@ -84,8 +90,6 @@
                
                lastID=1;
                lastTemporaryTopic=1;
  -             this.securityManager=securityManager;
  -             
        }
   
   
  @@ -108,51 +112,50 @@
        //This is a correct threading system, but this is not ideal... 
        //We should let threads cycle through the JMSServerQueue list, and 
synchronized on the queue they are working on.       
        public void run() {
  -                     while (alive) {
  -                             JMSServerQueue queue = null;                    
                
  -                             this.stopped = false;
  +             while (alive) {
   
  -                             //Wait (and sleep) until it can find something to do
  -                             synchronized (taskQueue) {
  -                                     while (queue == null && alive) {               
                         
  -                                             
  -                                             // size() is O(1) in LinkedList... 
  -                                             int size=taskQueue.size(); 
  -                                             if (size!=0) { 
  -                                                     
  -                                                     //<DEBUG>
  -                                                     queue = 
(JMSServerQueue)taskQueue.removeFirst();
  -                                                     
//queue=(JMSServerQueue)taskQueue.getFirst();
  -                                                     //</DEBUG>
  -                                                     
  -                                                     //One other thread can start 
working on the task queue...
  -                                                     if (size > 1) {
  -                                                             taskQueue.notify();
  -                                                     }
  -                                             } else {        
  -                                                     try {
  -                                                             //Log.log("I'm going 
to bed...");
  -                                                             taskQueue.wait(5000);
  -                                                             //Log.log("I wake up");
  -                                                     } catch (InterruptedException 
e) {
  -                                                     }
  +                     Task task = null;
  +                     this.stopped = false;
  +
  +                     //Wait (and sleep) until it can find something to do
  +                     synchronized (taskQueue) {
  +                             while (task == null && alive) {
  +
  +                                     // size() is O(1) in LinkedList... 
  +                                     int size = taskQueue.size();
  +                                     if (taskQueue.size() != 0) {
  +
  +                                             task = (Task) taskQueue.removeFirst();
  +
  +                                             //One other thread can start working 
on the task queue...
  +                                             if (size > 1) {
  +                                                     taskQueue.notify();
                                                }
                                                
  -                                     }
  -                             }
  -                             
  -                             if (alive) {
  -                                     //Ask the queue to do its job
  -                                     try {
  -                                             queue.doMyJob();
  -                                     } catch (JMSException e) {
  -                                             Log.error(e);
  +                                     } else {
  +                                             
  +                                             try {
  +                                                     taskQueue.wait(5000);
  +                                             } catch (InterruptedException e) {
  +                                             }
                                        }
  +
                                }
                        }
  -                     Log.log("JMS service stopped.");
  -                     this.stopped = true;
  +
  +                     if (!alive)     break;
  +                     
  +                     //Ask the queue to do its job
  +                     try {
  +                             task.run();
  +                     } catch (JMSException e) {
  +                             Log.error(e);
  +                     }
  +             }
  +
  +             Log.log("JMS service stopped.");
  +             this.stopped = true;
        }
   
        public void stopServer() {
  @@ -167,7 +170,7 @@
                SpyTopic newTopic=new SpyTopic(name);
                if (messageQueue.containsKey(newTopic)) throw new JMSException("This 
topic already exists !");
                
  -             JMSServerQueue queue=new JMSServerQueue(newTopic,null,this);
  +             JMSDestination queue=new JMSDestination(newTopic,null,this);
                
                //Add this new JMSServerQueue to the list
                synchronized (messageQueue) {
  @@ -187,7 +190,7 @@
                SpyQueue newQueue=new SpyQueue(name);
                if (messageQueue.containsKey(newQueue)) throw new JMSException("This 
queue already exists !");
                
  -             JMSServerQueue queue=new JMSServerQueue(newQueue,null,this);
  +             JMSDestination queue=new JMSDestination(newQueue,null,this);
                
                //Add this new JMSServerQueue to the list
                synchronized (messageQueue) {
  @@ -198,10 +201,6 @@
        
                return newQueue;
        }
  -
  -     // -----------------------------------------
  -     // Callbacks for the invocation layer ------
  -     // -----------------------------------------
        
        //Get a new ClientID for a connection
        public String getID()
  @@ -220,33 +219,18 @@
                return ID;
        }
        
  -     public synchronized SpyTopic createTopic(String name) throws JMSException
  -     {
  -             Log.log("createTopic("+name+")");
  -
  -             SpyTopic newTopic=new SpyTopic(name);
  -             if (!messageQueue.containsKey(newTopic)) throw new JMSException("This 
destination does not exist !");
  -             return newTopic;                
  -     }
  -
  -     public synchronized SpyQueue createQueue(String name) throws JMSException
  -     {
  -             Log.log("createQueue("+name+")");
  -
  -             SpyQueue newQueue=new SpyQueue(name);
  -             if (!messageQueue.containsKey(newQueue)) throw new JMSException("This 
destination does not exist !");
  -             return newQueue;
  -     }
  -
        public synchronized TemporaryTopic getTemporaryTopic(SpyDistributedConnection 
dc) throws JMSException
        {
                SpyTemporaryTopic topic=new SpyTemporaryTopic("JMS_TT"+(new 
Integer(lastTemporaryTopic++).toString()),dc);
   
  +             ClientConsumer ClientConsumer = getClientConsumer(dc);
                synchronized (messageQueue) {
  -                     JMSServerQueue queue=new JMSServerQueue(topic,dc,this);
  +
  +                     JMSDestination queue = new 
JMSDestination(topic,ClientConsumer,this);
                        HashMap newMap=(HashMap)messageQueue.clone();
                        newMap.put(topic,queue); 
                        messageQueue=newMap;
  +                     
                }
                
                return topic;
  @@ -254,157 +238,49 @@
        
        public synchronized TemporaryQueue getTemporaryQueue(SpyDistributedConnection 
dc) throws JMSException
        {
  +             
                SpyTemporaryQueue newQueue=new SpyTemporaryQueue("JMS_TQ"+(new 
Integer(lastTemporaryQueue++).toString()),dc);
   
  +             ClientConsumer ClientConsumer = getClientConsumer(dc);
                synchronized (messageQueue) {
  -                     JMSServerQueue sessionQueue=new 
JMSServerQueue(newQueue,dc,this);
  +                     
  +                     JMSDestination queue = new 
JMSDestination(newQueue,ClientConsumer,this);
                        HashMap newMap=(HashMap)messageQueue.clone();
  -                     newMap.put(newQueue,sessionQueue); 
  +                     newMap.put(newQueue,queue);
                        messageQueue=newMap;
  +                     
                }
                
                return newQueue;
        }
  -
  -     //A connection is closing [error or notification]
  -     public synchronized void connectionClosing(SpyDistributedConnection 
dc,JMSServerQueue noCheck)
  -     {
  -             Log.log("connectionClosing(dc="+dc+",noCheck="+noCheck+")");
  -             
  -             if (dc==null) return;
  -             
  -             //unregister its clientID
  -             if (dc.getClientID()!=null)
  -                     securityManager.removeID(dc.getClientID());
  -             
  -             //Remove the connection from the subscribers list
  -             synchronized (messageQueue) {
  -
  -                     HashMap newMap=null;
  -                     Iterator i=messageQueue.values().iterator();
  -                     boolean modified=false; //don't waste memory
  -     
  -                     while (i.hasNext()) {
  -                             
  -                             JMSServerQueue sq=(JMSServerQueue)i.next();
  -                                                             
  -                             if (dc.equals(sq.temporaryDestination)) {
  -                                     if (!modified) 
newMap=(HashMap)messageQueue.clone();
  -                                     newMap.remove(sq.destination);
  -                                     modified=true;
  -                             } else {
  -                                     if (sq==noCheck) continue;
  -                                     sq.connectionClosing(dc);
  -                             }
  -                             
  -                     }
  -             
  -                     if (modified) messageQueue=newMap;
  -             }
  -             
  -     }
  -     
  -     public synchronized void deleteTemporaryDestination(SpyDestination dest)
  -     {
  -             Log.log("deleteDestination(dest="+dest.toString()+")");
  -
  -             synchronized (messageQueue) {
  -                     HashMap newMap=(HashMap)messageQueue.clone();   
  -                     newMap.remove(dest);
  -                     messageQueue=newMap;
  -             }
                
  -     }
  -                     
        public void checkID(String ID) throws JMSException
        {
                securityManager.addClientID(ID);
        }
  -     
  -     //Sent by a client to Ack or Nack a message.
  -     public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem 
item) throws JMSException
  -     {
  -             acknowledge(dc, item, null);
  -     }
  -     
  -     
  +             
        //A connection has sent a new message
        public void addMessage(SpyDistributedConnection dc, SpyMessage val) throws 
JMSException 
        {
                addMessage( dc, val, null);             
        }       
        
  -     public void connectionListening(SpyDistributedConnection dc,boolean 
mode,Destination dest) throws JMSException
  -     {
  -             JMSServerQueue serverQueue=(JMSServerQueue)messageQueue.get(dest);
  -             if (serverQueue==null) throw new JMSException("This destination does 
not exist !");
  -             
  -             serverQueue.connectionListening(mode,dc);
  -     }
  -     
        public org.spydermq.security.SecurityManager getSecurityManager() {
                return securityManager;
        }       
  -
  -     //Used by QueueReceivers for receive(), receive(long wait), and receiveNoWait()
  -     public SpyMessage queueReceive(SpyDistributedConnection dc,Queue queue, long 
wait) throws JMSException
  -     {
  -             Log.log("JMSserver: queueReceive(queue="+queue+",wait="+wait+")");
  -             JMSServerQueue serverQueue=(JMSServerQueue)messageQueue.get(queue);
  -             if (serverQueue==null) throw new JMSException("This destination does 
not exist !");
  -             
  -             return serverQueue.queueReceive(wait,dc);
  -             
  -     }       
  -     
  -     //A connection object wants to subscribe to a Destination
  -     public void subscribe(SpyDistributedConnection dc,Destination dest) throws 
JMSException
  -     {
  -             Log.log("Server: 
subscribe(dest="+dest.toString()+",idConnection="+dc.getClientID()+")");
  -             
  -             JMSServerQueue queue=(JMSServerQueue)messageQueue.get(dest);
  -             if (queue==null) throw new JMSException("This destination does not 
exist !");
  -             queue.addSubscriber(dc);
  -     }               
  -     
  -     
  -     
  -     public void unsubscribe(SpyDistributedConnection dc,Destination dest) throws 
JMSException
  -     {
  -             Log.log("Server: 
unsubscribe(dest="+dest.toString()+",idConnection="+dc.getClientID()+")");
  -                             
  -             JMSServerQueue queue=(JMSServerQueue)messageQueue.get(dest);
  -             if (queue==null) throw new JMSException("This destination does not 
exist !");
  -             queue.removeSubscriber(dc,null);
  -     }
  -
  -     //Sent by a client to Ack or Nack a message.
  -     public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem 
item, Long txId) throws JMSException
  -     {               
  -             JMSServerQueue 
serverQueue=(JMSServerQueue)messageQueue.get(item.jmsDestination);
  -             if (serverQueue==null) throw new JMSException("Destination does not 
exist: "+item.jmsDestination);      
  -             serverQueue.acknowledge(dc, item, txId);
  -     }       
        
        //A connection has sent a new message
        public void addMessage(SpyDistributedConnection dc, SpyMessage val, Long txId) 
throws JMSException 
        {
        
                Log.notice("INCOMING: (TX="+txId+")"+dc.getClientID()+" => 
"+val.jmsDestination);       
  -             JMSServerQueue 
queue=(JMSServerQueue)messageQueue.get(val.jmsDestination);
  +             JMSDestination 
queue=(JMSDestination)messageQueue.get(val.jmsDestination);
                if (queue==null) throw new JMSException("This destination does not 
exist !");   
                //Add the message to the queue
                queue.addMessage(val, txId);
                
        }
        
  -     public JMSServerQueue getServerQueue(SpyDestination d) throws JMSException 
  -     {               
  -             JMSServerQueue queue=(JMSServerQueue)messageQueue.get(d);
  -             if (queue==null) throw new JMSException("This destination does not 
exist !");
  -             return queue;   
  -     }
  -     
        /**
         * The following function performs a Unit Of Work.
         *
  @@ -469,5 +345,172 @@
                        
                }
   
  +     }
  +
  +
  +
  +     //Sent by a client to Ack or Nack a message.
  +     public void acknowledge(SpyDistributedConnection dc, AcknowledgementRequest 
item) throws JMSException
  +     {
  +             acknowledge(dc, item, null);
  +     }
  +
  +     //Sent by a client to Ack or Nack a message.
  +     public void acknowledge(SpyDistributedConnection dc, AcknowledgementRequest 
item, Long txId) throws JMSException
  +     {
  +
  +             ClientConsumer queue = getClientConsumer(dc);
  +             queue.acknowledge(item, txId);
  +             
  +     }
  +
  +     //A connection is closing [error or notification]
  +     public synchronized void connectionClosing(SpyDistributedConnection dc) throws 
JMSException 
  +     {
  +             Log.log("JMSServer->connectionClosing(dc="+dc+")");             
  +             if (dc==null) return;
  +
  +             // Close it's ClientConsumer
  +             ClientConsumer cq = (ClientConsumer)clientConsumers.remove( dc );
  +             if( cq != null ) {
  +                     cq.close();
  +             }
  +
  +             //unregister its clientID
  +             if (dc.getClientID()!=null)
  +                     securityManager.removeID(dc.getClientID());
  +             
  +             //Remove any temporary destinations the consumer may have created.
  +             synchronized (messageQueue) {
  +
  +                     Iterator i=messageQueue.values().iterator();
  +                     while (i.hasNext()) {
  +                             
  +                             JMSDestination sq=(JMSDestination)i.next();            
                                                 
  +                             if (dc.equals(sq.temporaryDestination)) {
  +                                     i.remove();
  +                             } 
  +                     }
  +             
  +             }               
  +                     
  +     }
  +
  +     public void connectionFailure(SpyDistributedConnection dc) throws JMSException
  +     {
  +             Log.log("JMSServer->connectionFailure(dc="+dc+")");
  +
  +             //We should try again :) This behavior should under control of a 
Failure-Plugin         
  +             Log.error("I remove the Connection "+dc.getClientID()+" from the 
subscribers list");
  +
  +             connectionClosing(dc);
  +     }
  +
  +     // Gets the ClientConsumers mapped the the connection
  +     // If the connection is not mapped, a new ClientConsumer is created
  +     public ClientConsumer getClientConsumer(SpyDistributedConnection dc) throws 
JMSException
  +     {
  +             ClientConsumer cq = (ClientConsumer)clientConsumers.get( dc );
  +             if( cq == null ) {
  +                     cq = new ClientConsumer(this, dc);
  +                     clientConsumers.put( dc, cq );
  +             }
  +             return cq;
  +     }
  +     
  +     public JMSDestination getJMSDestination(SpyDestination dest) 
  +     {
  +             return (JMSDestination)messageQueue.get(dest);  
  +     }
  +
  +     //A connection object wants to subscribe to a Destination
  +     public void subscribe(SpyDistributedConnection dc, Subscription sub) throws 
JMSException
  +     {
  +             Log.log("Server: 
subscribe(dest="+sub.destination+",idConnection="+dc.getClientID()+")");
  +
  +             ClientConsumer ClientConsumer = getClientConsumer(dc);
  +
  +             ClientConsumer.addSubscription(sub);
  +             
  +     }
  +
  +     public void unsubscribe(SpyDistributedConnection dc, int subscriptionId) 
throws JMSException
  +     {
  +             Log.log("Server: unsubscribe(idConnection="+dc.getClientID()+")");
  +
  +             ClientConsumer ClientConsumer = getClientConsumer(dc);
  +
  +             ClientConsumer.removeSubscription(subscriptionId);
  +
  +     }
  +
  +     public SpyMessage[] browse(SpyDistributedConnection dc, Destination dest, 
String selector) throws JMSException {
  +             //ClientConsumer.addSubscription(sub);
  +             JMSDestination queue = (JMSDestination)messageQueue.get(dest);
  +             if( queue == null )
  +                     throw new JMSException("That destination does not exist");
  +                     
  +             return queue.browse(selector);
  +     }
  +     public void listenerChange(SpyDistributedConnection dc, int subscriberId, 
boolean state) throws JMSException {
  +
  +             ClientConsumer ClientConsumer = getClientConsumer(dc);
  +             ClientConsumer.listenerChange(subscriberId, state);
  +             
  +     }
  +     public SpyMessage receive(SpyDistributedConnection dc, int subscriberId, long 
wait) throws JMSException {
  +             ClientConsumer ClientConsumer = getClientConsumer(dc);
  +             return ClientConsumer.receive(subscriberId, wait);
  +     }
  +     public void setEnabled(SpyDistributedConnection dc, boolean enabled) throws 
JMSException {
  +             ClientConsumer ClientConsumer = getClientConsumer(dc);
  +             ClientConsumer.setEnabled(enabled);
  +     }
  +     
  +     public synchronized Queue createQueue(SpyDistributedConnection dc, String 
name) throws JMSException
  +     {
  +             Log.log("createQueue("+name+")");
  +
  +             SpyQueue newQueue=new SpyQueue(name);
  +             if (!messageQueue.containsKey(newQueue)) throw new JMSException("This 
destination does not exist !");
  +             return newQueue;
  +     }
  +
  +     public synchronized Topic createTopic(SpyDistributedConnection dc, String 
name) throws JMSException
  +     {
  +             Log.log("createTopic("+name+")");
  +
  +             SpyTopic newTopic=new SpyTopic(name);
  +             if (!messageQueue.containsKey(newTopic)) throw new JMSException("This 
destination does not exist !");
  +             return newTopic;                
  +     }
  +
  +     public synchronized void deleteTemporaryDestination(SpyDistributedConnection 
dc, SpyDestination dest)
  +     {
  +             Log.log("deleteDestination(dest="+dest.toString()+")");
  +
  +             synchronized (messageQueue) {
  +                     HashMap newMap=(HashMap)messageQueue.clone();   
  +                     newMap.remove(dest);
  +                     messageQueue=newMap;
  +             }
  +             
  +     }
  +     
  +     public void restoreMessage(SpyMessage message, String queueId) throws 
JMSException
  +     {
  +             JMSDestination 
queue=(JMSDestination)messageQueue.get(message.jmsDestination);
  +             if (queue==null) throw new JMSException("This destination does not 
exist !");   
  +             //Add the message to the queue
  +             queue.restoreMessage(message, queueId);
  +     }
  +
  +     public void saveConfig() throws java.io.IOException {
  +             
  +             String file = 
getClass().getClassLoader().getResource("spyderMQ.xml").getFile();
  +             java.io.PrintStream stream = new java.io.PrintStream( new 
java.io.FileOutputStream(file));
  +             stream.print( serverConfig.toXML(true) );
  +             stream.close();
  +             
        }
   }
  
  
  
  1.3       +92 -72    spyderMQ/src/java/org/spydermq/server/PersistenceManager.java
  
  Index: PersistenceManager.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/PersistenceManager.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- PersistenceManager.java   2000/12/19 06:43:36     1.2
  +++ PersistenceManager.java   2000/12/23 15:48:24     1.3
  @@ -19,7 +19,6 @@
   import org.spydermq.persistence.SpyMessageLog;
   import org.spydermq.SpyDestination;
   import org.spydermq.SpyMessage;
  -
   import org.spydermq.SpyDistributedConnection;
   
   /**
  @@ -27,7 +26,7 @@
    *
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class PersistenceManager {
   
  @@ -43,7 +42,52 @@
        HashMap messageLogs = new HashMap();
        // Maps (Long)txIds to LinkedList of Runnable tasks
        HashMap postCommitTasks = new HashMap();
  +     // Maps Global transactions to local transactions
  +     HashMap globalToLocal = new HashMap();
  +     // Maps (Long)txIds to LinkedList of Runnable tasks
  +     HashMap postRollbackTasks = new HashMap();
   
  +     class GlobalXID implements Runnable {
  +             SpyDistributedConnection dc;
  +             Object xid;
  +
  +             GlobalXID(SpyDistributedConnection dc,Object xid) {
  +                     this.dc = dc;
  +                     this.xid = xid;
  +             }
  +             
  +             public boolean equals(Object obj)
  +             {
  +                     if (obj==null) return false;                    
  +                     if (obj.getClass()!=GlobalXID.class) return false;
  +                     return ((GlobalXID)obj).xid.equals( xid ) &&
  +                                ((GlobalXID)obj).dc.equals(dc);
  +             }
  +             
  +             public int hashCode() {
  +                     return xid.hashCode();
  +             }
  +
  +             public void run() {
  +                     synchronized (globalToLocal) {
  +                             globalToLocal.remove(this);                     
  +                     }
  +             }
  +     }
  +
  +     static class LogInfo {
  +             SpyMessageLog log;
  +             SpyDestination destination;
  +             String queueId;
  +
  +             LogInfo(SpyMessageLog log, SpyDestination destination, String queueId) 
{
  +                     this.log=log;
  +                     this.destination=destination;
  +                     this.queueId=queueId;
  +             }
  +             
  +     }
  +             
        /**
         * PersistenceManager constructor.
         */
  @@ -68,26 +112,29 @@
        }
   
        
  -     public void add(org.spydermq.SpyDestination dest, org.spydermq.SpyMessage 
message, Long txId) throws javax.jms.JMSException {
  +     public void add(String queueId, org.spydermq.SpyMessage message, Long txId) 
throws javax.jms.JMSException {
   
  -             SpyMessageLog log;
  +             LogInfo logInfo;
   
                synchronized (messageLogs) {
  -                     log = (SpyMessageLog) messageLogs.get(dest);
  +                     logInfo = (LogInfo) 
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
                }
   
  -             if (log == null)
  +             if (logInfo == null)
                        throw new javax.jms.JMSException("Destination was not 
initalized with the PersistenceManager");
   
  -             synchronized (log) {
  -                     log.add(message, txId);
  -             }
  +             logInfo.log.add(message, txId);
   
        }
   
        
        public void addPostCommitTask(Long txId, Runnable task) throws 
javax.jms.JMSException {
   
  +             if( txId == null ) {
  +                     task.run();
  +                     return;
  +             }
  +             
                LinkedList tasks;
                synchronized (postCommitTasks) {
                        tasks = (LinkedList) postCommitTasks.get(txId);
  @@ -135,38 +182,19 @@
        }
   
        
  -     public void initQueue(org.spydermq.SpyDestination dest) throws 
javax.jms.JMSException {
  -
  -             try {
  -
  -                     URL logFile = new URL(dataDirectory, dest.getName() + "-queue" 
+ ".dat");
  -                     SpyMessageLog log = new SpyMessageLog(logFile.getFile());
  -
  -                     messageLogs.put(dest, log);
  -
  -             } catch (javax.jms.JMSException e) {
  -                     throw e;
  -             } catch (Exception e) {
  -                     javax.jms.JMSException newE = new 
javax.jms.JMSException("Invalid configuration.");
  -                     newE.setLinkedException(e);
  -                     throw newE;
  -             }
  -
  -     }
  -
        
  -     public void remove(org.spydermq.SpyDestination dest, org.spydermq.SpyMessage 
message, Long txId) throws javax.jms.JMSException {
  +     public void remove(String queueId, org.spydermq.SpyMessage message, Long txId) 
throws javax.jms.JMSException {
   
  -             SpyMessageLog log;
  +             LogInfo logInfo;
   
                synchronized (messageLogs) {
  -                     log = (SpyMessageLog) messageLogs.get(dest);
  +                     logInfo = (LogInfo) 
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
                }
   
  -             if (log == null)
  +             if (logInfo == null)
                        throw new javax.jms.JMSException("Destination was not 
initalized with the PersistenceManager");
   
  -             log.remove(message, null);
  +             logInfo.log.remove(message, null);
   
        }
        
  @@ -179,19 +207,19 @@
                        clone = (HashMap) messageLogs.clone();
                }
   
  -             Iterator iter = clone.keySet().iterator();
  +             Iterator iter = clone.values().iterator();
                while (iter.hasNext()) {
  -                     SpyDestination dest = (SpyDestination) iter.next();
  +             
  +                     LogInfo logInfo = (LogInfo)iter.next();                        
 
   
  -                     JMSServerQueue q = server.getServerQueue(dest);
  -                     SpyMessageLog log = (SpyMessageLog) clone.get(dest);
  +                     JMSDestination q = 
server.getJMSDestination(logInfo.destination);
   
  -                     SpyMessage rebuild[] = log.restore(commitedTXs);
  +                     SpyMessage rebuild[] = logInfo.log.restore(commitedTXs);
   
  -                     //TODO: make sure this sync lock is good enough
  +                     //TODO: make sure this lock is good enough
                        synchronized (q) {
                                for (int i = 0; i < rebuild.length; i++) {
  -                                     q.restoreMessage(rebuild[i]);
  +                                     q.restoreMessage(rebuild[i], logInfo.queueId);
                                        q.messageIdCounter = 
Math.max(q.messageIdCounter, rebuild[i].messageId + 1);
                                }
                        }
  @@ -220,42 +248,13 @@
                }
                        
        }
  -
  -     // Maps Global transactions to local transactions
  -     HashMap globalToLocal = new HashMap();
  -     // Maps (Long)txIds to LinkedList of Runnable tasks
  -     HashMap postRollbackTasks = new HashMap();
  -
  -     class GlobalXID implements Runnable {
  -             SpyDistributedConnection dc;
  -             Object xid;
  +     
  +     public void addPostRollbackTask(Long txId, Runnable task) throws 
javax.jms.JMSException {
   
  -             GlobalXID(SpyDistributedConnection dc,Object xid) {
  -                     this.dc = dc;
  -                     this.xid = xid;
  -             }
  -             
  -             public boolean equals(Object obj)
  -             {
  -                     if (obj==null) return false;                    
  -                     if (obj.getClass()!=GlobalXID.class) return false;
  -                     return ((GlobalXID)obj).xid.equals( xid ) &&
  -                                ((GlobalXID)obj).dc.equals(dc);
  +             if( txId == null ) {
  +                     return;
                }
                
  -             public int hashCode() {
  -                     return xid.hashCode();
  -             }
  -
  -             public void run() {
  -                     synchronized (globalToLocal) {
  -                             globalToLocal.remove(this);                     
  -                     }
  -             }
  -     }
  -
  -     public void addPostRollbackTask(Long txId, Runnable task) throws 
javax.jms.JMSException {
  -
                LinkedList tasks;
                synchronized( postRollbackTasks ) {
                        tasks = (LinkedList)postRollbackTasks.get(txId);
  @@ -294,5 +293,26 @@
                        throw new JMSException("Transaction does not exist from: 
"+dc.getClientID()+" xid="+xid);
                        
                return txid;
  +     }
  +
  +     public void initQueue( SpyDestination dest, String queueId ) throws 
javax.jms.JMSException {
  +
  +             try {
  +
  +                     URL logFile = new URL(dataDirectory, 
dest.toString()+"-"+queueId+".dat");
  +                     SpyMessageLog log = new SpyMessageLog(logFile.getFile());
  +
  +                     LogInfo info = new LogInfo(log, dest, queueId);
  +                     
  +                     messageLogs.put(""+dest+"-"+queueId, info);
  +
  +             } catch (javax.jms.JMSException e) {
  +                     throw e;
  +             } catch (Exception e) {
  +                     javax.jms.JMSException newE = new 
javax.jms.JMSException("Invalid configuration.");
  +                     newE.setLinkedException(e);
  +                     throw newE;
  +             }
  +
        }
   }
  
  
  
  1.4       +1 -1      spyderMQ/src/java/org/spydermq/server/SpyderMQService.java
  
  Index: SpyderMQService.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/SpyderMQService.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- SpyderMQService.java      2000/12/21 22:33:59     1.3
  +++ SpyderMQService.java      2000/12/23 15:48:25     1.4
  @@ -4,7 +4,6 @@
    * Distributable under GPL license.
    * See terms of license at gnu.org.
    */
  - 
   package org.spydermq.server;
   
   import java.io.File;
  @@ -27,8 +26,9 @@
    *
    *   @author Vincent Sheffer ([EMAIL PROTECTED])
    *   @author <a href="mailto:[EMAIL PROTECTED]">Juha Lindfors</a>
  + *   @author Hiram Chirino ([EMAIL PROTECTED])
    *
  - *   @version $Revision: 1.3 $
  + *   @version $Revision: 1.4 $
    */
   public class SpyderMQService
      extends ServiceMBeanSupport
  
  
  
  1.7       +6 -3      spyderMQ/src/java/org/spydermq/server/StartServer.java
  
  Index: StartServer.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/StartServer.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- StartServer.java  2000/12/21 22:33:59     1.6
  +++ StartServer.java  2000/12/23 15:48:25     1.7
  @@ -48,7 +48,7 @@
    *   @author Vincent Sheffer ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    *
  - *   @version $Revision: 1.6 $
  + *   @version $Revision: 1.7 $
    */
   public class StartServer implements Runnable
   {
  @@ -160,11 +160,14 @@
                        //Get an InitialContext
                        InitialContext ctx=new InitialContext();
   
  +                     //Create the JMSServer object
  +                     theServer = new JMSServer();
  +
  +                     theServer.serverConfig = serverCfg;
  +
                        //Create a SecurityManager object
                        securityManager=new SecurityManager();
  -
  -                     //Create the JMSServer object
  -                     theServer = new JMSServer(securityManager);
  +                     theServer.securityManager = securityManager;
   
                        PersistenceManager persistenceManager = new 
PersistenceManager(theServer, serverCfg.getElement("PersistenceManager"));
                        theServer.persistenceManager = persistenceManager;
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/server/AbstractQueue.java
  
  Index: AbstractQueue.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.server;
  
  import javax.jms.JMSException;
  import org.spydermq.SpyMessage;
  
  /**
   * This defines the interface for the queues.  This is implemented by
   * SharedQueue and ExclusiveQueue
   *
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public interface AbstractQueue {
        public void addConsumer(ClientConsumer consumer) throws JMSException;
        public void addMessage(SpyMessage mes, Long txId) throws JMSException;
        void notifyMessageAvailable();
        public void removeConsumer(ClientConsumer consumer) throws JMSException;
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/server/ClientConsumer.java
  
  Index: ClientConsumer.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.server;
  
  import javax.jms.Destination;
  import javax.jms.JMSException;
  import javax.jms.DeliveryMode;
  
  import org.spydermq.*;
  import org.spydermq.persistence.SpyMessageLog;
  import org.spydermq.xml.XElement;
  
  import java.util.Iterator;
  import java.util.Hashtable;
  import java.util.LinkedList;
  import java.util.HashMap;
  import java.util.TreeSet;
  
  
  /**
   *  This represent the clients queue which consumes messages from the 
   *  destinations on the provider.
   *
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class ClientConsumer implements Task {
  
        //List of messages waiting to be transmitted to the client
        private LinkedList messages = new LinkedList();
        //The JMSServer object
        JMSServer server;
        //The connection this queue will send messages over
        SpyDistributedConnection dc;
        //Maps a destination to a LinkedList of Subscriptions
        public HashMap destinationSubscriptions = new HashMap();
        //Is this connection enabled (Can we transmit to the receiver)
        boolean enabled;
        //Maps a a subsction id to a Subscription
        public HashMap subscriptions = new HashMap();
        //LinkedList of the the temporary destinations that this client created
        public LinkedList temporaryDestinations = new LinkedList();
        //List of messages that should be acked or else returned to thier
        //owning exclusive queues.
        public HashMap unacknowledgedMessages = new HashMap();
  
        // Constructor ---------------------------------------------------         
        public ClientConsumer(JMSServer server, SpyDistributedConnection dc) throws 
JMSException
        {
                this.server=server;
                this.dc = dc;                   
        }
  
        void acknowledge(AcknowledgementRequest item, Long txId) throws 
javax.jms.JMSException {
  
                Log.log(""+this+"->acknowledge(item="+item+",txId="+txId+")");
                
                // This task gets run to place the neg ack a messge (place it back on 
the queue)
                class RestoreMessageTask implements Runnable {
                        SpyMessage message;
                        int subscriptionId;
                        RestoreMessageTask(SpyMessage m,int subscriptionId) { message 
= m; this.subscriptionId=subscriptionId; }
                        public void run() {
                                Log.log("Restoring message: " + message.jmsMessageID);
                                String queueId = JMSDestination.DEFAULT_QUEUE_ID;
                                if( message.jmsDestination instanceof SpyTopic ) {
                                        // Still need to implement
                                        //queueId 
                                }
                                try {
                                        server.restoreMessage(message,queueId);
                                } catch (JMSException ignore ) {
                                }
                        }
                }               
                
                SpyMessage m;
                synchronized (unacknowledgedMessages) {
                        m = (SpyMessage)unacknowledgedMessages.remove(item);
                }
  
                if (m == null)
                        return;
                        
                // Was it a negative acknowledge??
                if (!item.isAck) {
                
                        Runnable task = new RestoreMessageTask(m, item.subscriberId);
                        server.persistenceManager.addPostCommitTask(txId, task);
                        
                } else {
                        
                        if( m.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ) {
                                if( m.getJMSDestination() instanceof SpyQueue )
                                        server.persistenceManager.remove("queue", 
m,txId);
                                else
                                        
server.persistenceManager.remove(dc.getClientID(), m,txId);
                        }
                        
                        Runnable task = new RestoreMessageTask(m,item.subscriberId);
                        server.persistenceManager.addPostRollbackTask(txId, task);
                                
                        Log.log("Message Ack: " + m.messageId);
                }
  
        }
  
        public void addMessage(SpyMessage message) throws JMSException
        {
                Log.log(""+this+"->addMessage(message="+message+")");
  
                LinkedList l = (LinkedList)destinationSubscriptions.get( 
message.getJMSDestination() );
                if( l == null )
                        throw new JMSException("No subscription found for that 
destination.");
  
                Iterator subs = l.iterator();           
                while(  subs.hasNext() ) {
                        
                        Subscription s = (Subscription)subs.next();
                        if( s.accepts( message, false ) ) {
                        
                                synchronized (messages) {
  
                                        ReceiveRequest r = new ReceiveRequest();
                                        r.message = message;
                                        
                                        messages.add(r);
                                        
                                }
                                return;
                        }
                }
  
                        
        }
  
        public void addSubscription(Subscription req) throws JMSException
        {
                Log.log(""+this+"->addSubscription(req="+req+")");
                
                req.dc = dc;
                
                synchronized (subscriptions ) {
  
                        subscriptions.put(new Integer(req.subscriptionId), req );
                                
                        LinkedList ll = (LinkedList)destinationSubscriptions.get( 
req.destination );
                        if( ll == null ) {
                                ll = new LinkedList();
                                
                                destinationSubscriptions.put(req.destination, ll );
                                
                                JMSDestination 
queue=(JMSDestination)server.getJMSDestination(req.destination);
                                if (queue==null) throw new JMSException("This 
destination does not exist !");
                                
                                if( queue.isTopic ) {
                                        if( req.durableSubscriptionName!=null ) {
                                                // 
queue.addExclusiveConsumer(dc.getClientID(), this);
                                        } else {
                                                queue.addSharedConsumer(this);         
                 
                                        }
                                } else  {
                                        
queue.addExclusiveConsumer(queue.DEFAULT_QUEUE_ID, this);
                                }
                        }
  
                        ll.add( req );
                }                       
        }
  
        // 
        public void close() {
  
                Log.log(""+this+"->close()");
  
                synchronized (subscriptions) {
                        Iterator i = subscriptions.keySet().iterator();
                        while( i.hasNext() ) {
                                Integer subscriptionId = (Integer)i.next();
                                try {
                                        removeSubscription(subscriptionId.intValue());
                                } catch ( JMSException ignore ) {
                                }
                        }
                        
                }
                
                synchronized (unacknowledgedMessages) {
                        Iterator i = unacknowledgedMessages.keySet().iterator();
                        while( i.hasNext() ) {
                                
                                AcknowledgementRequest item = 
(AcknowledgementRequest)i.next();
                                try {
                                        acknowledge(item, null);                       
 
                                } catch ( JMSException ignore ) {
                                }
                        }                       
                }
                
        }
  
        public org.spydermq.server.AbstractQueue getSubscribedQueue( Subscription req 
) throws javax.jms.JMSException {
  
                JMSDestination 
queue=(JMSDestination)server.getJMSDestination(req.destination);
                if (queue==null) throw new JMSException("This destination does not 
exist !");
  
                if( queue.isTopic ) {
                        if( req.durableSubscriptionName!=null ) {
                                return null;
                        } else {
                                return queue.sharedQueue;
                        }
                } else  {
                        return queue.getExclusiveQueue(queue.DEFAULT_QUEUE_ID);
                }
                
        }
  
        public void listenerChange(int subscriberId, boolean state) throws 
JMSException {
  
                Log.log(""+this+"->listenerChange(subscriberId="+subscriberId+", 
state="+state+")");
  
                Subscription req = (Subscription)subscriptions.get(new 
Integer(subscriberId));
                if( req == null )
                        throw new JMSException("The provided subscription does not 
exist");
  
                req.listening = state;
  
                if( req.listening ) {
                        // Notify the queue. It could be waiting for a consumer
                        getSubscribedQueue(req).notifyMessageAvailable();
                }
                
        }
  
        public void notifyMessageAvailable() {
  
                Log.log(""+this+"->notifyMessageAvailable()");
  
                synchronized (messages) {
                        if( messages.size() == 0 )
                                return;                 
                }
                        
                synchronized (server.taskQueue) {
                        server.taskQueue.addLast(this);
                        server.taskQueue.notify();
                }
                
        }
  
        public SpyMessage receive(int subscriberId, long wait) throws JMSException {
  
                Log.log(""+this+"->receive(subscriberId="+subscriberId+", 
wait="+wait+")");
                
                Subscription req = (Subscription)subscriptions.get(new 
Integer(subscriberId));
                if( req == null )
                        throw new JMSException("The provided subscription does not 
exist");
  
                JMSDestination queue = server.getJMSDestination(req.destination);
                if( queue == null )
                        throw new JMSException("The subscription's destination does 
not exist");
  
                // Is it a receiveNoWait()
                if( wait == -1 ) {
                        if( queue.isTopic ) {
                                if( req.durableSubscriptionName!=null ) {
                                        // Not Implemented yet
                                        //return 
queue.getExclusiveQueue(queue.DEFAULT_QUEUE_ID).receiveMessage();
                                }
                        } else  {
                                return 
queue.getExclusiveQueue(queue.DEFAULT_QUEUE_ID).receiveMessage();
                        }
                }
  
                // Notify the queue. It could be waiting for a consumer
                req.receiving = true;
                getSubscribedQueue(req).notifyMessageAvailable();
                return null;
                
        }
  
        public void removeSubscription(int subscriptionId) throws JMSException
        {
                
Log.log(""+this+"->removeSubscription(subscriberId="+subscriptionId+")");
  
                Subscription req;
                synchronized (subscriptions ) {
  
                        req = (Subscription)subscriptions.remove(new 
Integer(subscriptionId));
                        
                        if( req == null )
                                throw new JMSException("The subscription had not been 
previously registered");
                                
                        LinkedList ll = (LinkedList)destinationSubscriptions.get( 
req.destination );
                        if( ll == null ) 
                                throw new JMSException("The subscription was not 
registered with the destination");
                                
                        ll.remove( req );
                        if( ll.size() != 0 )
                                return;
                        
                        // There is no subscriber for the destination at this point    
 
                        destinationSubscriptions.remove(req.destination);
                        
                        JMSDestination 
queue=(JMSDestination)server.getJMSDestination(req.destination);
                        if (queue==null) 
                                throw new JMSException("The subscription was registed 
with a destination that does not exist !");
  
                        if( queue.isTopic ) {
                                if( req.durableSubscriptionName!=null ) {
                                        // 
queue.addExclusiveConsumer(dc.getClientID(), this);
                                } else {
                                        queue.removeSharedConsumer(this);
                                }
                        } else  {
                                queue.removeExclusiveConsumer(queue.DEFAULT_QUEUE_ID, 
this);
                        }
  
                }
        }
  
        // Iterate over the consumers asking them to take messages until they stop
        // consuming.
        synchronized public void run() throws JMSException 
        {               
                Log.log(""+this+"->run()");
                ReceiveRequest[] job;
                
                synchronized (messages) {                       
                        if( messages.size() == 0 )
                                return;
                                
                        job=new ReceiveRequest[messages.size()];
                        job=(ReceiveRequest[])messages.toArray(job);
                        messages.clear();                       
                }
  
                try {
                        dc.cr.receive(job);
                } catch ( Exception e ) {
                        server.connectionFailure(dc);
                }
                        
        }
  
        // Get the first message off the queue that I can.  return false if none taken.
        public boolean scanExclusiveQueue( ExclusiveQueue queue ) throws JMSException {
  
                Log.log(""+this+"->scanExclusiveQueue(queue="+queue+")");
                
                Iterator i = queue.messages.iterator();
                while( i.hasNext() ) {
                        
                        SpyMessage message = (SpyMessage)i.next();
  
                        synchronized (subscriptions) {
                                
                                LinkedList l = 
(LinkedList)destinationSubscriptions.get( message.getJMSDestination() );
                                if( l == null )
                                        throw new JMSException("No subscription found 
for that destination.");
  
                                Iterator subs = l.iterator();
                                
                                while(  subs.hasNext() ) {
                                        
                                        Subscription s = (Subscription)subs.next();
                                        if( s.accepts( message, true ) ) {
  
                                                s.receiving = false;
                                                i.remove();
                                                
                                                synchronized (messages) {
  
                                                        ReceiveRequest r = new 
ReceiveRequest();
                                                        r.message = message;
                                                        r.subscriptionId = new 
Integer(s.subscriptionId);
                                                        
                                                        messages.add(r);
                                                        
                                                        AcknowledgementRequest ack = 
new AcknowledgementRequest();
                                                        ack.destination = 
message.getJMSDestination();
                                                        ack.messageID = 
message.getJMSMessageID();
                                                        ack.subscriberId = 
s.subscriptionId;
                                                        ack.isAck = false;
                                                        
unacknowledgedMessages.put(ack, message);
                                                        
                                                }
                                                notifyMessageAvailable();
  
                                                return true;
                                                
                                        }
                                }
                                
                        }
                }
                
                return false;
        }
  
        public void setEnabled(boolean enabled) {
                Log.log(""+this+"->setEnabled(enabled="+enabled+")");
                this.enabled = enabled;
        }
  
        public String toString() {
                return "ClientConsumer:"+dc.getClientID();
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/server/ExclusiveQueue.java
  
  Index: ExclusiveQueue.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.server;
  
  import javax.jms.Destination;
  import javax.jms.JMSException;
  import javax.jms.DeliveryMode;
  
  import org.spydermq.*;
  import org.spydermq.persistence.SpyMessageLog;
  import org.spydermq.selectors.Selector;
  
  import java.util.Iterator;
  import java.util.Hashtable;
  import java.util.LinkedList;
  import java.util.HashMap;
  import java.util.TreeSet;
  
  
  /**
   *    This class represents a queue which provides it's messages
   *  exclusivly to one consumer at a time.
   *
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class ExclusiveQueue implements Task, AbstractQueue {
  
        //List of messages waiting to be dispatched
        TreeSet messages = new TreeSet();
        //The JMSServer object
        JMSServer server;
        //DistributedConnection objs that have "registered" to this Destination
        private LinkedList consumers = new LinkedList();
        //The queueId needed to identify this queue with the persistence manager.
        String queueId;
  
                
        //Used to put a message that was added previously to the queue, back in the 
queue
        public void restoreMessage(SpyMessage mes) 
        {
                //restore a message to the message list...
                synchronized (messages) {
                        messages.add(mes);      
                }
                notifyMessageAvailable();
        }       
        
        public void addMessage(SpyMessage mes, Long txId) throws JMSException
        {
                Log.log(""+this+"->addMessage(mes="+mes+",txId="+txId+")");
  
                // This task gets run to make the message visible in the queue.
                class AddMessagePostCommitTask implements Runnable {
                        SpyMessage message;
                        
                        AddMessagePostCommitTask(SpyMessage m) {
                                message = m;
                        }
                        
                        public void run() {
                                //restore a message to the message list...
                                synchronized (messages) {
                                        messages.add(message);  
                                }
                                notifyMessageAvailable();
                        }
                }
                
                // Persist the message if it was persistent
                if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ) 
                        server.persistenceManager.add(queueId, mes, txId);
  
                // The message gets added to the queue after the transaction
                // commits (if the message was transacted)      
                Runnable task = new AddMessagePostCommitTask(mes);
                if( txId == null ) {
                        task.run();
                } else {
                        server.persistenceManager.addPostCommitTask(txId, task);
                }
                
        }
        
        // Constructor ---------------------------------------------------         
        public ExclusiveQueue(JMSServer server, String queueId) throws JMSException
        {
                
                this.server=server;
                this.queueId = queueId;
                                        
        }
  
        // synchrnozed so no message dispatching occurs while we add a consumer
        synchronized public void addConsumer(ClientConsumer consumer) throws 
JMSException
        {
                //We want to avoid removeSubscriber, addSubscriber or sendOneMessage 
to work concurently
                synchronized (consumers) {
                        consumers.add(consumer);
                }
        }
  
        public SpyMessage[] browse(String selector) throws JMSException {
                        
                if( selector == null ) {
                        SpyMessage list[];
                        synchronized (messages) {
                                list = new SpyMessage[messages.size()];
                                list = (SpyMessage [])messages.toArray(list);
                        }
                        return list;
                } else {
                        Selector s = new Selector( selector );
                        LinkedList selection=new LinkedList();
                        
                        synchronized (messages) {
                                Iterator i = messages.iterator();
                                while( i.hasNext() ) {
                                        SpyMessage m = (SpyMessage)i.next();
                                        if( s.test(m) )
                                                selection.add(m);
                                }
                        }
                        
                        SpyMessage list[];
                        list = new SpyMessage[selection.size()];
                        list = (SpyMessage [])selection.toArray(list);
                        return list;                    
                }
        }
  
        public void notifyMessageAvailable() {
  
                Log.log(""+this+"->notifyMessageAvailable()");
                
                synchronized (server.taskQueue) {
                        server.taskQueue.addLast(this);
                        server.taskQueue.notify();
                }
                
        }
  
        //Used by QueueReceivers for receive(), receive(long wait), and receiveNoWait()
        public SpyMessage receiveMessage() throws  JMSException
        {
                synchronized (messages) {
                        if (messages.size()==0) 
                                return null;
                                
                        SpyMessage m = (SpyMessage)messages.first();
                        messages.remove(m);
                        
                        return m;
                }
        }
  
        public void removeConsumer(ClientConsumer consumer) throws JMSException
        {
                synchronized (consumers) {
                        consumers.remove(consumer);
                }
        }
  
        // Iterate over the consumers asking them to take messages until they stop
        // consuming.
        public void run() throws JMSException 
        {               
                
                Log.log(""+this+"->run()");
                
                synchronized (messages) {                                              
 
                        synchronized (consumers) {
                                                                
                                LinkedList consumersDone = new LinkedList();
  
                                while( consumers.size()!=0 && messages.size() != 0) {
                                        ClientConsumer consumer = 
(ClientConsumer)consumers.removeFirst();
  
                                        // Tell the consumer to scan the message queue
                                        if( consumer.scanExclusiveQueue(this) ) {
                                                // The consumer consumed a message.
                                                // So place him at the back of the 
consumer list
                                                consumers.addLast(consumer);
                                        } else {
                                                // The consumer did not find a message 
to consume,
                                                // Place him at the back of the done 
list
                                                consumersDone.addLast(consumer);
                                        }
                                }
  
                                // Add all the consumers that were done, back into the 
consumer
                                // list.
                                while( consumersDone.size() != 0 ) {
                                        consumers.addLast(consumersDone.removeFirst());
                                }
                                        
                        }                                                              
                 
                }
                                
        }
  
        public String toString() {
                return "ExclusiveQueue:"+queueId;
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/server/JMSDestination.java
  
  Index: JMSDestination.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.server;
  
  import javax.jms.Destination;
  import javax.jms.JMSException;
  import javax.jms.DeliveryMode;
  
  import org.spydermq.*;
  import org.spydermq.persistence.SpyMessageLog;
  
  import java.util.Iterator;
  import java.util.Hashtable;
  import java.util.LinkedList;
  import java.util.HashMap;
  import java.util.TreeSet;
  
  /**
   *    This class is a message queue which is stored (hashed by Destination) on the 
   *    JMS provider
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class JMSDestination {
  
        public static final String DEFAULT_QUEUE_ID = "queue";
  
        //the Destination of this queue
        SpyDestination destination;
        //If this is a temporaryDestination, temporaryDestination=ClientConsumer of 
the owner, otherwise it's null
        ClientConsumer temporaryDestination;
        //The JMSServer object
        JMSServer server;
        //Am I a queue or a topic  
        boolean isTopic;
        //Counter used to number incomming messages. (Used to order the messages.)
        long messageIdCounter = Long.MIN_VALUE;
        //Hashmap of ExclusiveQueues 
        HashMap exclusiveQueues = new HashMap();
        //ShareQueue used for topics
        SharedQueue sharedQueue;
  
        // Constructor ---------------------------------------------------         
        JMSDestination(SpyDestination dest,ClientConsumer temporary,JMSServer server) 
throws JMSException
        {
                destination=dest;
                temporaryDestination=temporary;
                this.server=server;
                isTopic=dest instanceof SpyTopic;
                
                sharedQueue = new SharedQueue(server);
                
                // If this is not a temp destination, then we should persist data
                if( temporaryDestination == null ) {
                        if( isTopic ) {
                                // Not Implemented yet
                                // TODO: init durable topic subscriber exclusive 
queues here
                        } else {
                                exclusiveQueues.put(DEFAULT_QUEUE_ID, new 
ExclusiveQueue(server, DEFAULT_QUEUE_ID));
                                server.persistenceManager.initQueue(dest, 
DEFAULT_QUEUE_ID);
                        }
                }
                
        }
  
        public void addMessage(SpyMessage mes, Long txId) throws JMSException
        {
        
                Log.log(""+this+"->addMessage(mes="+mes+",txId="+txId+")");
                //Number the message so that we can preserve order of delivery.
                mes.messageId = messageIdCounter++;
  
                if( isTopic ) {
                        
                        sharedQueue.addMessage(mes, txId);
                        
                        synchronized (exclusiveQueues) {
                                
                                if( exclusiveQueues.size() == 0 )
                                        return;
                
                                Iterator iter = exclusiveQueues.values().iterator();
                                while( iter.hasNext() ) {
                                        ExclusiveQueue eq = 
(ExclusiveQueue)iter.next();
                                        eq.addMessage(mes, txId);
                                }
                        }
                        
                } else {
                        
                        ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get( 
DEFAULT_QUEUE_ID );
                        eq.addMessage(mes, txId);
                        
                }
                
        }
        
  
        // Package protected ---------------------------------------------          
        void addExclusiveConsumer(String queue, ClientConsumer c) throws JMSException {
  
                Log.log(""+this+"->addExclusiveConsumer(queue="+queue+", 
consumer="+c+")");
                
                ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get( queue );
                if( eq == null )
                        throw new JMSException("That destination queue does not 
exist");
                
                eq.addConsumer(c);
        }
  
        // Package protected ---------------------------------------------          
        void addSharedConsumer(ClientConsumer c) throws JMSException {
                Log.log(""+this+"->addSharedConsumer(consumer="+c+")");
                sharedQueue.addConsumer(c);
        }
  
        public SpyMessage[] browse(String selector) throws JMSException {
                Log.log(""+this+"->browse(selector="+selector+")");             
                ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get( 
DEFAULT_QUEUE_ID );
                return eq.browse( selector );
        }
  
        // Package protected ---------------------------------------------          
        ExclusiveQueue getExclusiveQueue(String queue) throws JMSException {
                
                ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get( queue );
                if( eq == null )
                        throw new JMSException("That destination queue does not 
exist");
                
                return eq;
        }
  
        // Package protected ---------------------------------------------          
        void removeConsumerFromAll(ClientConsumer c) throws JMSException {
                Log.log(""+this+"->removeConsumerFromAll(consumer="+c+")");
  
                sharedQueue.removeConsumer(c);
                Iterator i = exclusiveQueues.values().iterator();
                while ( i.hasNext() ) {
                        ExclusiveQueue eq = (ExclusiveQueue)i.next();
                        eq.removeConsumer(c);
                }
                
        }
  
        // Package protected ---------------------------------------------          
        void removeExclusiveConsumer(String queue, ClientConsumer c) throws 
JMSException {
  
                Log.log(""+this+"->removeExclusiveConsumer(queue="+queue+", 
consumer="+c+")");
                
                ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get( queue );
                if( eq == null )
                        throw new JMSException("That destination queue does not 
exist");
                
                eq.removeConsumer(c);
        }
  
        // Package protected ---------------------------------------------          
        void removeSharedConsumer(ClientConsumer c) throws JMSException {
                Log.log(""+this+"->removeSharedConsumer(consumer="+c+")");
                sharedQueue.removeConsumer(c);
        }
  
        //Used to put a message that was added previously to the queue, back in the 
queue
        public void restoreMessage(SpyMessage mes, String queueId) throws JMSException
        {
                Log.log(""+this+"->restoreMessage(mes="+mes+",queue="+queueId+")");
                ExclusiveQueue eq = getExclusiveQueue(queueId);         
                eq.restoreMessage(mes);         
        }
  
        public String toString() {
                return "JMSDestination:"+destination;
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/server/SharedQueue.java
  
  Index: SharedQueue.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.server;
  
  import javax.jms.Destination;
  import javax.jms.JMSException;
  import javax.jms.DeliveryMode;
  
  import org.spydermq.*;
  import org.spydermq.persistence.SpyMessageLog;
  
  import java.util.Iterator;
  import java.util.Hashtable;
  import java.util.LinkedList;
  import java.util.HashMap;
  import java.util.TreeSet;
  
  /**
   *    This class is a message queue which allows sending a single message
   *  to multiple consumers.
   *      
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SharedQueue implements Task, AbstractQueue {
  
        //List of Pending messages
        private TreeSet messages;
        //The JMSServer object
        JMSServer server;
        //DistributedConnection objs that have "registered" to this Destination
        private LinkedList consumers;
  
        // Constructor ---------------------------------------------------         
        SharedQueue(JMSServer server) throws JMSException
        {
                this.server=server;
                consumers=new LinkedList();
                messages=new TreeSet();                 
        }
  
        public void addMessage(SpyMessage mes, Long txId) throws JMSException
        {
  
                // This task gets run to make the message visible in the queue.
                class AddMessagePostCommitTask implements Runnable {
                        SpyMessage message;
                        
                        AddMessagePostCommitTask(SpyMessage m) {
                                message = m;
                        }
                        
                        public void run() {
                                synchronized (messages) 
                                {
  
                                        //Add the message to the queue
                                        messages.add(message);                  
                                        notifyMessageAvailable();
                                        
                                }                                       
                        }
                }
                
                // The message gets added to the queue after the transaction
                // commits (if the message was transacted)      
                Runnable task = new AddMessagePostCommitTask(mes);
                if( txId == null ) {
                        task.run();
                } else {
                        server.persistenceManager.addPostCommitTask(txId, task);
                }
                
        }
        
        // Package protected ---------------------------------------------          
        public void addConsumer(ClientConsumer consumer) throws JMSException
        {
                //We want to avoid removeSubscriber, addSubscriber or sendOneMessage 
to work concurently
                synchronized (consumers) {
                        consumers.add(consumer);
                }
        }
  
        public void notifyMessageAvailable() {
                
                synchronized (server.taskQueue) {
                        server.taskQueue.addLast(this);
                        server.taskQueue.notify();
                }
                
        }
  
        public void removeConsumer(ClientConsumer consumer) throws JMSException
        {
                synchronized (consumers) {
                        consumers.remove(consumer);
                }
        }
  
        // This will dispatch messages in the queue the the ClientConsumers
        synchronized public void run() throws JMSException 
        {       
                SpyMessage[] job;
                
                synchronized (messages) {
                        
                        if( messages.size() == 0 )
                                return;
                                
                        job=new SpyMessage[messages.size()];
                        job=(SpyMessage[])messages.toArray(job);
                        messages.clear();                       
                }
  
                Iterator iter = consumers.iterator();
                while( iter.hasNext() ) {
                        
                        ClientConsumer consumer = (ClientConsumer)iter.next();
                
                        for( int i=0 ; i < job.length; i++ ) 
                                consumer.addMessage(job[i]);
                                                
                        consumer.notifyMessageAvailable();
                        
                }
                        
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/server/Task.java
  
  Index: Task.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.server;
  
  import javax.jms.JMSException;
  
  /**
   * A interface similar to the Runnable interface except this
   * one allows run() to throw a JMSException.
   * 
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public interface Task {
        public void run() throws JMSException;
  }
  
  
  

Reply via email to