User: hiram   
  Date: 00/12/21 14:33:57

  Modified:    src/java/org/spydermq SpyConnection.java
                        SpyMessageConsumer.java SpyQueueConnection.java
                        SpyQueueReceiver.java SpyQueueSession.java
                        SpySession.java SpyTopicSession.java
                        SpyTopicSubscriber.java SpyXAResource.java
  Added:       src/java/org/spydermq SpyConnectionConsumer.java
                        SpyConsumer.java
  Log:
  Added ConnectionConsumer so that work on the ASF part of
  spyderMQ can start.
  
  Revision  Changes    Path
  1.18      +344 -335  spyderMQ/src/java/org/spydermq/SpyConnection.java
  
  Index: SpyConnection.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyConnection.java,v
  retrieving revision 1.17
  retrieving revision 1.18
  diff -u -r1.17 -r1.18
  --- SpyConnection.java        2000/12/19 06:43:34     1.17
  +++ SpyConnection.java        2000/12/21 22:33:55     1.18
  @@ -13,6 +13,7 @@
   import javax.jms.JMSException;
   import javax.jms.ConnectionMetaData;
   import javax.jms.ExceptionListener;
  +
   import java.util.HashMap;
   import java.util.HashSet;
   import java.util.Collection;
  @@ -22,24 +23,24 @@
   import java.io.FileInputStream;
   import java.io.File;
   import java.io.IOException;
  +
   import org.spydermq.distributed.interfaces.DistributedJMSServer;
   import org.spydermq.distributed.interfaces.ConnectionReceiver;
   
  -
   /**
    *   This class implements javax.jms.Connection
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.17 $
  + *   @version $Revision: 1.18 $
    */
  -public class SpyConnection 
  -     implements Connection, Serializable
  -{
  -
  -     // Attributes ----------------------------------------------------
  +public class SpyConnection implements Connection, Serializable {
   
  +     //////////////////////////////////////////////////////////////
  +     // Attributes 
  +     //////////////////////////////////////////////////////////////
  +     
        // This is our connection to the JMS server
        protected DistributedJMSServer provider;
        //This is the clientID
  @@ -57,290 +58,274 @@
        //Is the connection closed ?
        boolean closed;
        //Name of the connectionReceiver class
  -     String crClassName;     
  +     String crClassName;
        //the exceptionListener
        private ExceptionListener exceptionListener;
        // Used to control tranactions
        SpyXAResourceManager spyXAResourceManager;
  +
  +     //////////////////////////////////////////////////////////////
  +     // Constructors
  +     //////////////////////////////////////////////////////////////
   
  -     // Constructor ---------------------------------------------------         
  -     SpyConnection(DistributedJMSServer theServer,String cID,String crCN) throws 
JMSException
  -     {
  +     SpyConnection(DistributedJMSServer theServer, String cID, String crCN) throws 
JMSException {
                //Set the attributes
                provider = theServer;
  -             destinations=new HashMap();
  -             createdSessions=new HashSet();
  -             distributedConnection=null;
  -             closed=false;
  -             lastMessageID=0;
  -             modeStop=true;
  -             clientID=cID;
  -             crClassName=crCN;
  +             destinations = new HashMap();
  +             createdSessions = new HashSet();
  +             distributedConnection = null;
  +             closed = false;
  +             lastMessageID = 0;
  +             modeStop = true;
  +             clientID = cID;
  +             crClassName = crCN;
                spyXAResourceManager = new SpyXAResourceManager(this);
        }
  -
  -     // Public --------------------------------------------------------
  -
  -     //<DEBUG>
  -     
  -     public int rec=0;
  -     
  -     //</DEBUG>
   
  -     public String getClientID() throws JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The connection is 
closed");
  +     //////////////////////////////////////////////////////////////
  +     // Public Methods
  +     //////////////////////////////////////////////////////////////
  +
  +     public String getClientID() throws JMSException {
  +             if (closed)
  +                     throw new IllegalStateException("The connection is closed");
                return clientID;
        }
   
  +     public void setClientID(String cID) throws JMSException {
  +             if (closed)
  +                     throw new IllegalStateException("The connection is closed");
  +             if (clientID != null)
  +                     throw new IllegalStateException("The connection has already a 
clientID");
   
  -     public void setClientID(String cID) throws JMSException
  -     {
  -         if (closed) throw new IllegalStateException("The connection is closed");
  -             if (clientID!=null) throw new IllegalStateException("The connection 
has already a clientID");
  +             Log.log("SetClientID(" + clientID + ")");
   
  -             Log.log("SetClientID("+clientID+")");
  -
  -             try {           
  +             try {
                        provider.checkID(cID);
                } catch (JMSException e) {
                        throw e;
                } catch (Exception e) {
  -                     failureHandler(e,"Cannot connect to the JMSServer");
  +                     failureHandler(e, "Cannot connect to the JMSServer");
                }
  -                     
  -             clientID=cID;
  +
  +             clientID = cID;
        }
  - 
  -     public ConnectionMetaData getMetaData() throws JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The connection is 
closed");                
  -             
  +
  +     public ConnectionMetaData getMetaData() throws JMSException {
  +             if (closed)
  +                     throw new IllegalStateException("The connection is closed");
  +
                return new SpyConnectionMetaData();
        }
   
  -     public ExceptionListener getExceptionListener() throws JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The connection is 
closed");
  -             if (distributedConnection==null) createReceiver();
  -             
  -             return exceptionListener;
  -     }
  +     public ExceptionListener getExceptionListener() throws JMSException {
  +             if (closed)
  +                     throw new IllegalStateException("The connection is closed");
  +             if (distributedConnection == null)
  +                     createReceiver();
   
  -     public void setExceptionListener(ExceptionListener listener) throws 
JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The connection is 
closed");
  -             if (distributedConnection==null) createReceiver();
  -             
  -             exceptionListener=listener;
  +             return exceptionListener;
        }
   
  -     public void start() throws JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The connection is 
closed");
  -             if (distributedConnection==null) createReceiver();
  -                     
  -             if (!modeStop) return;
  -             modeStop=false;
  +     public void setExceptionListener(ExceptionListener listener) throws 
JMSException {
  +             if (closed)
  +                     throw new IllegalStateException("The connection is closed");
  +             if (distributedConnection == null)
  +                     createReceiver();
  +
  +             exceptionListener = listener;
  +     }
  +
  +     public void start() throws JMSException {
  +             if (closed)
  +                     throw new IllegalStateException("The connection is closed");
  +             if (distributedConnection == null)
  +                     createReceiver();
  +
  +             if (!modeStop)
  +                     return;
  +             modeStop = false;
   
  -             Iterator i=destinations.keySet().iterator();
  +             Iterator i = destinations.keySet().iterator();
                while (i.hasNext()) {
  -                     Destination d=(Destination)i.next();
  -                     ConsumerSet ci=(ConsumerSet)destinations.get(d);
  +                     Destination d = (Destination) i.next();
  +                     ConsumerSet ci = (ConsumerSet) destinations.get(d);
   
  -                     if ( ci.getLasListeningState() ) {
  +                     if (ci.getLasListeningState()) {
                                try {
  -                                     
provider.connectionListening(distributedConnection, true,d);
  -                             } catch ( Exception e ) {
  +                                     
provider.connectionListening(distributedConnection, true, d);
  +                             } catch (Exception e) {
                                        failureHandler(e, "Cannot contact the JMS 
server");
                                }
                        }
  -                             
  +
                }
   
  -             changeModeStop(modeStop);               
  +             changeModeStop(modeStop);
        }
   
  -     public void stop() throws JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The connection is 
closed");
  -             if (distributedConnection==null) createReceiver();
  -             
  -             if (modeStop) return;           
  -             modeStop=true;                          
  +     public void stop() throws JMSException {
  +             if (closed)
  +                     throw new IllegalStateException("The connection is closed");
  +             if (distributedConnection == null)
  +                     createReceiver();
  +
  +             if (modeStop)
  +                     return;
  +             modeStop = true;
   
  -             Iterator i=destinations.keySet().iterator();
  +             Iterator i = destinations.keySet().iterator();
                while (i.hasNext()) {
  -                     Destination d=(Destination)i.next();
  -                     ConsumerSet ci=(ConsumerSet)destinations.get(d);
  +                     Destination d = (Destination) i.next();
  +                     ConsumerSet ci = (ConsumerSet) destinations.get(d);
   
  -                     if ( ci.getLasListeningState() ) {
  +                     if (ci.getLasListeningState()) {
                                try {
  -                                     
provider.connectionListening(distributedConnection, false,d);
  -                             } catch ( Exception e ) {
  +                                     
provider.connectionListening(distributedConnection, false, d);
  +                             } catch (Exception e) {
                                        failureHandler(e, "Cannot contact the JMS 
server");
                                }
                        }
  -                             
  +
                }
   
                changeModeStop(modeStop);
   
        }
   
  -     public synchronized void close() throws JMSException
  -     {               
  -             if (closed) return;
  +     public synchronized void close() throws JMSException {
  +             if (closed)
  +                     return;
   
                //Get an ID / ConnectionReciever
  -             if (distributedConnection==null) createReceiver();
  -             
  +             if (distributedConnection == null)
  +                     createReceiver();
  +
  +             Log.log("Closing sessions");
                //notify his sessions
                synchronized (createdSessions) {
  -                     
  -                     Object[] vect=createdSessions.toArray();
  -                     for(int i=0;i<vect.length;i++) {
  -                             ((SpySession)vect[i]).close();
  +
  +                     Object[] vect = createdSessions.toArray();
  +                     for (int i = 0; i < vect.length; i++) {
  +                             ((SpySession) vect[i]).close();
                        }
  -             
  +
                }
  +             Log.log("Closed sessions");
   
  +             Log.log("Disconnecting from server");
                //Notify the JMSServer that I am closing
                try {
                        provider.connectionClosing(distributedConnection);
                        distributedConnection.close();
                } catch (Exception e) {
  -                     failureHandler(e,"Cannot close properly the connection");
  +                     failureHandler(e, "Cannot close properly the connection");
                }
  -                             
  +             Log.log("Disconnected from server");
  +
                // Only set the closed flag after all the objects that depend 
                // on this connection have been closed.
  -             closed=true;    
  -     } 
  +             closed = true;
  +     }
   
        //called by a TemporaryDestination which is going to be deleted()
  -     public void deleteTemporaryDestination(SpyDestination dest) throws JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The connection is 
closed");
  -             if (distributedConnection==null) createReceiver();
  +     public void deleteTemporaryDestination(SpyDestination dest) throws 
JMSException {
  +             if (closed)
  +                     throw new IllegalStateException("The connection is closed");
  +             if (distributedConnection == null)
  +                     createReceiver();
   
  -             Log.log("SpyConnection: deleteDestination(dest="+dest.toString()+")");
  -             
  +             Log.log("SpyConnection: deleteDestination(dest=" + dest.toString() + 
")");
  +
                try {
  -                     
  +
                        //Remove it from the destinations list
                        synchronized (destinations) {
  -                             HashMap newMap=(HashMap)destinations.clone();   
  +                             HashMap newMap = (HashMap) destinations.clone();
                                newMap.remove(dest);
  -                             destinations=newMap;
  +                             destinations = newMap;
                        }
  -                     
  +
                        //Notify its sessions that this TemporaryDestination is going 
to be deleted()
                        //We could do that only on the Sessions "linked" to this 
Destination
                        synchronized (createdSessions) {
  -                             
  -                             Iterator i=createdSessions.iterator();
  +
  +                             Iterator i = createdSessions.iterator();
                                while (i.hasNext()) {
  -                                     
((SpySession)i.next()).deleteTemporaryDestination(dest);
  +                                     ((SpySession) 
i.next()).deleteTemporaryDestination(dest);
                                }
   
                        }
  -                     
  +
                        //Ask the broker to delete() this TemporaryDestination
                        provider.deleteTemporaryDestination(distributedConnection, 
dest);
  -                     
  +
                } catch (Exception e) {
  -                     failureHandler(e,"Cannot delete the TemporaryDestination");
  +                     failureHandler(e, "Cannot delete the TemporaryDestination");
                }
  -             
  +
        }
  -     
  +
        //Get a new messageID (creation of a new message)
  -     String getNewMessageID() throws JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The connection is 
closed");                
  -             if (distributedConnection==null) createReceiver();
  -                                                                      
  -             return clientID+"-"+(lastMessageID++);
  +     String getNewMessageID() throws JMSException {
  +             if (closed)
  +                     throw new IllegalStateException("The connection is closed");
  +             if (distributedConnection == null)
  +                     createReceiver();
  +
  +             return clientID + "-" + (lastMessageID++);
        }
  -     
  +
        //notify his sessions that he has changed his stopped mode
  -     synchronized void changeModeStop(boolean newValue)
  -     {
  +     synchronized void changeModeStop(boolean newValue) {
                synchronized (createdSessions) {
  -                     
  -                     Iterator i=createdSessions.iterator();
  +
  +                     Iterator i = createdSessions.iterator();
                        while (i.hasNext()) {
  -                             ((SpySession)i.next()).notifyStopMode(newValue);
  +                             ((SpySession) i.next()).setStopMode(newValue);
                        }
   
                }
  -             
  +
        }
  -     
  +
        //Called by a session when it is closing
  -     void sessionClosing(SpySession who)
  -     {
  -             synchronized (createdSessions) 
  -             {                       
  +     void sessionClosing(SpySession who) {
  +             synchronized (createdSessions) {
                        createdSessions.remove(who);
                }
   
                //This session should not be in the "destinations" object anymore. 
                //We could check this, though
  -     }
  -     
  -     
  -     // Protected -------------------------------------------------------
  -
  -     //create a new Distributed object which receives the messages for this 
connection
  -     protected void createReceiver() throws JMSException
  -     {
  -             try {
  -                     if (clientID==null) askForAnID();
  -                     
  -                     org.spydermq.distributed.interfaces.ConnectionReceiverSetup cr 
= 
(org.spydermq.distributed.interfaces.ConnectionReceiverSetup)Class.forName(crClassName).newInstance();
  -                     cr.setConnection(this);
  -                     distributedConnection=new 
SpyDistributedConnection(clientID,cr);
  -                     
distributedConnection.setConnectionReceiver(cr.createClient()); 
  -                     
  -                     provider.setSpyDistributedConnection(distributedConnection);
  -             } catch (Exception e) {
  -                     failureHandler(e,"Cannot create a ConnectionReceiver");
  -             }
  -     }
  -
  -     //ask the JMS server for a new ID
  -     protected void askForAnID() throws JMSException
  -     {
  -             try {
  -                     clientID=provider.getID();
  -             } catch (Exception e) {
  -                     failureHandler(e,"Cannot get an ID");
  -             }               
        }
  -
  -     
  -     public void failureHandler(Exception e,String reason) throws JMSException
  -     {
  +     public void failureHandler(Exception e, String reason) throws JMSException {
                e.printStackTrace();
  -             
  -             JMSException excep=new JMSException(reason);
  +
  +             JMSException excep = new JMSException(reason);
                excep.setLinkedException(e);
  -             
  -             if (exceptionListener!=null) {
  +
  +             if (exceptionListener != null) {
                        synchronized (exceptionListener) {
                                exceptionListener.onException(excep);
                        }
                }
  -             
  +
                throw excep;
        }
  -     
  +
        public DistributedJMSServer getProvider() {
                return provider;
        }
  -     
  +
  +     //ask the JMS server for a new ID
  +     protected void askForAnID() throws JMSException {
  +             try {
  +                     clientID = provider.getID();
  +             } catch (Exception e) {
  +                     failureHandler(e, "Cannot get an ID");
  +             }
  +     }
  +
        // The ConsumerSet inner class is used by:
        //
        //              addConsumer()
  @@ -350,137 +335,206 @@
        //              pickListeningConsumer()
        //
        class ConsumerSet extends HashSet {
  -             boolean lasListeningState=false;
  -             
  +             boolean lasListeningState = false;
  +
                boolean getLasListeningState() {
                        return lasListeningState;
                }
  -                     
  -             boolean listenStateChanged() { 
  +
  +             boolean listenStateChanged() {
                        boolean t = false;
  -                     
  +
                        Iterator iter = iterator();
  -                     while( iter.hasNext() ) {
  -                             SpyMessageConsumer c = (SpyMessageConsumer)iter.next();
  -                             if( c.isListening() ) {
  +                     while (iter.hasNext()) {
  +                             SpyConsumer c = (SpyConsumer) iter.next();
  +                             if (c.isListening()) {
                                        t = true;
                                        break;
                                }
                        }
  -                     
  -                     if( t == lasListeningState ) {
  +
  +                     if (t == lasListeningState) {
                                return false;
                        }
  +
                        lasListeningState = t;
                        return true;
                }
        }
  -             
  -     //A new Consumer has been created for the Destination dest
  -     void addConsumer(Destination dest, SpyMessageConsumer consumer) throws 
JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The connection is 
closed");                
  -             if (distributedConnection==null) createReceiver();
  -                                                                      
  -             Log.log("Connection: addConsumer(dest="+dest.toString()+")");
   
  -             try {
  +     /**
  +      * Called whenever a consumer changes his listening state on a destination.
  +      * We see if the consumer change, changed the overall listening state for the 
destination.
  +      * Creation date: (11/16/2000 2:20:22 PM)
  +      * @return org.spydermq.distributed.interfaces.DistributedJMSServer
  +      */
  +     public void listenerChange(Destination d) throws JMSException {
   
  -                     synchronized (destinations) {
  -                             
  -                             ConsumerSet 
consumerSet=(ConsumerSet)destinations.get(dest);
  -                             
  -                             if (consumerSet==null) {                        
  -                                     consumerSet=new ConsumerSet();
  -                                     consumerSet.add(consumer);
  -                                     HashMap 
newDestinations=(HashMap)destinations.clone();
  -                                     newDestinations.put(dest,consumerSet);
  -                                     destinations=newDestinations;
  -                                     provider.subscribe(distributedConnection,dest);
  -                             } else {                        
  -                                     consumerSet.add(consumer);
  +             if (closed)
  +                     throw new IllegalStateException("The connection is closed");
  +             if (distributedConnection == null)
  +                     createReceiver();
  +
  +             ConsumerSet ci = (ConsumerSet) destinations.get(d);
  +             
  +             if( ci == null ) 
  +                     return;                 
  +             if (ci.listenStateChanged()) {
  +                     try {
  +                             if (ci.getLasListeningState()) {
  +                                     
provider.connectionListening(distributedConnection, true, d);
  +                             } else {
  +                                     
provider.connectionListening(distributedConnection, false, d);
                                }
  +                     } catch (Exception e) {
  +                             failureHandler(e, "Cannot contact the JMS server");
                        }
  +             }
   
  +     }
  +
  +     /**
  +      * @return org.spydermq.distributed.interfaces.DistributedJMSServer
  +      */
  +     SpyMessage queueReceive(Queue queue, long wait) throws JMSException {
  +
  +             try {
  +                     return provider.queueReceive(distributedConnection, queue, 
wait);
                } catch (Exception e) {
  -                     failureHandler(e,"Cannot subscribe to this Destination");
  -             }       
  -             
  +                     failureHandler(e, "Cannot create a ConnectionReceiver");
  +                     return null;
  +             }
        }
  +
  +     ////////////////////////////////////////////////////////////////////
  +     // Protected
  +     ////////////////////////////////////////////////////////////////////
        
  +     //create a new Distributed object which receives the messages for this 
connection
  +     protected void createReceiver() throws JMSException {
  +             try {
  +                     if (clientID == null)
  +                             askForAnID();
   
  -     //A consumer does not need to recieve the messages from a Destination 
  -     void removeConsumer(Destination dest, SpyMessageConsumer who) throws 
JMSException {
  -             
  -             if (distributedConnection==null) createReceiver();
  -             
  -             Log.log("Connection: removeSession(dest="+dest.toString()+")");
  -             
  +                     org.spydermq.distributed.interfaces.ConnectionReceiverSetup cr 
=
  +                             
(org.spydermq.distributed.interfaces.ConnectionReceiverSetup) 
Class.forName(crClassName).newInstance();
  +                     cr.setConnection(this);
  +                     distributedConnection = new SpyDistributedConnection(clientID, 
cr);
  +                     distributedConnection.setConnectionReceiver(cr.createClient());
  +
  +                     provider.setSpyDistributedConnection(distributedConnection);
  +             } catch (Exception e) {
  +                     failureHandler(e, "Cannot create a ConnectionReceiver");
  +             }
  +     }
  +
  +     // used to acknowledge a message
  +     protected void send(SpyAcknowledgementItem item) throws JMSException {
  +             try {
  +                     provider.acknowledge(distributedConnection, item);
  +             } catch (Exception e) {
  +                     failureHandler(e, "Cannot acknowlege a message.");
  +             }
  +     }
  +
  +     //Send a message to the provider
  +     void sendToServer(SpyMessage mes) throws JMSException {
  +             if (closed)
  +                     throw new IllegalStateException("The connection is closed");
  +             if (distributedConnection == null)
  +                     createReceiver();
  +
  +             try {
  +
  +                     provider.addMessage(distributedConnection, mes);
  +
  +             } catch (Exception e) {
  +                     failureHandler(e, "Cannot send a message to the JMS provider");
  +             }
  +     }
  +
  +     // Used to commit/rollback a transaction.
  +     protected void send(TransactionRequest transaction) throws JMSException {
  +
  +             try {
  +                     provider.transact(distributedConnection, transaction);
  +             } catch (Exception e) {
  +                     failureHandler(e, "Cannot process a transaction.");
  +             }
  +
  +     }
  +
  +     //A new Consumer has been created for the Destination dest
  +     void addConsumer(Destination dest, SpyConsumer consumer) throws JMSException {
  +             if (closed)
  +                     throw new IllegalStateException("The connection is closed");
  +             if (distributedConnection == null)
  +                     createReceiver();
  +
  +             Log.log("Connection: addConsumer(dest=" + dest.toString() + ")");
  +
                try {
  -                     
  +
                        synchronized (destinations) {
  -                             
  -                             ConsumerSet 
consumerSet=(ConsumerSet)destinations.get(dest);
  -                             
  -                             if (consumerSet!=null) {
  -                                     boolean empty=consumerSet.remove(who);
  -                                     if (empty) {
  -                                             HashMap 
newDestinations=(HashMap)destinations.clone();
  -                                             newDestinations.remove(dest);
  -                                             destinations=newDestinations;
  -                                             
provider.unsubscribe(distributedConnection, dest);
  -                                     } 
  +
  +                             ConsumerSet consumerSet = (ConsumerSet) 
destinations.get(dest);
  +
  +                             if (consumerSet == null) {
  +                                     consumerSet = new ConsumerSet();
  +                                     consumerSet.add(consumer);
  +                                     HashMap newDestinations = (HashMap) 
destinations.clone();
  +                                     newDestinations.put(dest, consumerSet);
  +                                     destinations = newDestinations;
  +                                     provider.subscribe(distributedConnection, 
dest);
                                } else {
  -                                     //this should not happen
  -                                     HashMap 
newDestinations=(HashMap)destinations.clone();
  -                                     newDestinations.remove(dest);
  -                                     destinations=newDestinations;
  -                                     provider.unsubscribe(distributedConnection, 
dest);
  +                                     consumerSet.add(consumer);
                                }
  -                             
                        }
  -                     
  +
                } catch (Exception e) {
  -                     failureHandler(e,"Cannot unsubscribe to this destination");
  +                     failureHandler(e, "Cannot subscribe to this Destination");
                }
   
  -     }       
  +     }
   
  -             
        //Gets all the consumers subscribed to a destination
  -     public SpyMessageConsumer[] getConsumers(Destination dest) throws JMSException 
{
  -             
  -             if (closed) throw new IllegalStateException("The connection is 
closed");                
  -             if (distributedConnection==null) createReceiver();
  -     
  +     public SpyConsumer[] getConsumers(Destination dest) throws JMSException {
  +
  +             if (closed)
  +                     throw new IllegalStateException("The connection is closed");
  +             if (distributedConnection == null)
  +                     createReceiver();
  +
                synchronized (destinations) {
  -                     ConsumerSet consumerSet=(ConsumerSet)destinations.get(dest);   
                         
  -                     if (consumerSet==null || consumerSet.size()==0) 
  +                     ConsumerSet consumerSet = (ConsumerSet) destinations.get(dest);
  +                     if (consumerSet == null || consumerSet.size() == 0)
                                return null;
  -                     
  -                     SpyMessageConsumer rc[]=new 
SpyMessageConsumer[consumerSet.size()];
  -                     return (SpyMessageConsumer[])consumerSet.toArray(rc);
  +
  +                     SpyConsumer rc[] = new SpyConsumer[consumerSet.size()];
  +                     return (SpyConsumer[]) consumerSet.toArray(rc);
                }
  -                                     
  +
        }
   
        //Gets the first consumer that is listening to a destination.   
  -     public SpyMessageConsumer pickListeningConsumer(Destination dest) throws 
JMSException {
  -             
  -             if (closed) throw new IllegalStateException("The connection is 
closed");                
  -             if (distributedConnection==null) createReceiver();
  -                                                                      
  +     public SpyConsumer pickListeningConsumer(Destination dest) throws JMSException 
{
  +
  +             if (closed)
  +                     throw new IllegalStateException("The connection is closed");
  +             if (distributedConnection == null)
  +                     createReceiver();
  +
                synchronized (destinations) {
  -                     
  -                     ConsumerSet consumerSet=(ConsumerSet)destinations.get(dest);
  -                     
  -                     if (consumerSet==null || consumerSet.size()==0) {
  +
  +                     ConsumerSet consumerSet = (ConsumerSet) destinations.get(dest);
  +
  +                     if (consumerSet == null || consumerSet.size() == 0) {
                                return null;
  -                     } else {                        
  +                     } else {
                                Iterator i = consumerSet.iterator();
  -                             while( i.hasNext() ) {
  -                                     SpyMessageConsumer c = 
(SpyMessageConsumer)i.next();
  -                                     if( c.isListening() || c.waitInReceive ) {
  +                             while (i.hasNext()) {
  +                                     SpyConsumer c = (SpyConsumer) i.next();
  +                                     if (c.isListening() || c.isReceiving()) {
                                                return c;
                                        }
                                }
  @@ -488,82 +542,37 @@
                }
   
                return null;
  -             
  -     }
  -             
  -     /**
  -      * Called whenever a consumer changes his listening state on a destination.
  -      * We see if the consumer change, changed the overall listening state for the 
destination.
  -      * Creation date: (11/16/2000 2:20:22 PM)
  -      * @return org.spydermq.distributed.interfaces.DistributedJMSServer
  -      */             
  -     public void listenerChange(Destination d) throws JMSException {
  -             
  -             if (closed) throw new IllegalStateException("The connection is 
closed");
  -             if (distributedConnection==null) createReceiver();
  -                     
  -             ConsumerSet ci=(ConsumerSet)destinations.get(d);
  -             if( ci.listenStateChanged() ) {
  -                     try {
  -                             if ( ci.getLasListeningState() ) {
  -                                     
provider.connectionListening(distributedConnection,true,d);
  -                             } else {
  -                                     
provider.connectionListening(distributedConnection,false,d);
  -                             }
  -                     } catch ( Exception e ) {
  -                             failureHandler(e, "Cannot contact the JMS server");
  -                     }
  -             }
  -             
  -     }       
  -     
  -     
  -     /**
  -      * @return org.spydermq.distributed.interfaces.DistributedJMSServer
  -      */              
  -     SpyMessage queueReceive(Queue queue, long wait) throws JMSException {
  -             
  -             try {
  -                     return provider.queueReceive(distributedConnection, queue, 
wait);
  -             } catch (Exception e) {
  -                     failureHandler(e,"Cannot create a ConnectionReceiver");
  -                     return null;
  -             }
  +
        }
  -     
  -     // used to acknowledge a message
  -     protected void send(SpyAcknowledgementItem item) throws JMSException 
  -     {
  -             try {
  -                     provider.acknowledge(distributedConnection, item);
  -             } catch (Exception e) {
  -                     failureHandler(e,"Cannot acknowlege a message.");
  -             }               
  -     }       
  -                     
  -     //Send a message to the provider
  -     void sendToServer(SpyMessage mes) throws JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The connection is 
closed");                
  -             if (distributedConnection==null) createReceiver();
   
  +     //A consumer does not need to recieve the messages from a Destination 
  +     void removeConsumer(Destination dest, SpyConsumer who) throws JMSException {
  +
  +             if (distributedConnection == null)
  +                     createReceiver();
  +
  +             Log.log("Connection: removeSession(dest=" + dest.toString() + ")");
  +
                try {
  +
  +                     synchronized (destinations) {
  +
  +                             ConsumerSet consumerSet = (ConsumerSet) 
destinations.get(dest);
  +                             if (consumerSet == null) 
  +                                     throw new RuntimeException("Destination does 
not have any consumers.");
                                
  -                     provider.addMessage(distributedConnection, mes);
  -                                             
  -             } catch (Exception e) {
  -                     failureHandler(e,"Cannot send a message to the JMS provider");
  -             }
  -     }
  +                             consumerSet.remove(who);
   
  -     // Used to commit/rollback a transaction.
  -     protected void send(TransactionRequest transaction) throws JMSException  {
  -             
  -             try {
  -                     provider.transact(distributedConnection, transaction);
  +                             if ( consumerSet.isEmpty() ) {
  +                                     destinations.remove(dest);
  +                                     provider.unsubscribe(distributedConnection, 
dest);
  +                             }
  +
  +                     }
  +
                } catch (Exception e) {
  -                     failureHandler(e,"Cannot process a transaction.");
  +                     failureHandler(e, "Cannot unsubscribe to this destination");
                }
  -             
  +
        }
   }
  
  
  
  1.7       +236 -121  spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java
  
  Index: SpyMessageConsumer.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- SpyMessageConsumer.java   2000/12/19 06:43:34     1.6
  +++ SpyMessageConsumer.java   2000/12/21 22:33:55     1.7
  @@ -24,12 +24,9 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.6 $
  + *   @version $Revision: 1.7 $
    */
  -public class SpyMessageConsumer 
  -     implements MessageConsumer
  -{
  -     // Attributes ----------------------------------------------------
  +abstract public class SpyMessageConsumer implements MessageConsumer, SpyConsumer {
   
        //Link to my session
        public SpySession session;
  @@ -41,114 +38,208 @@
        public Selector selector;
        //The message selector
        public String messageSelector;
  -
        //List of Pending messages (not yet delivered)
        LinkedList messages;
        //Is the consumer sleeping in a receive() ?
        boolean waitInReceive;
  -     public Destination destination; 
  -     
  +     //The destination this consumer is getting messages from
  +     SpyDestination destination;
        //Am I in noLocal mode ?
        boolean noLocal;
  -     
  +
        // Constructor ---------------------------------------------------
  -        
  -     SpyMessageConsumer(SpySession s)
  -     {
  -             session=s;
  -             messageListener=null;
  -             closed=false;
  -             selector=null;
  -             messageSelector=null;
  -             messages=new LinkedList();
  -             waitInReceive=false;
  -     }
  -     
  -             
  -     
  +
  +     SpyMessageConsumer(SpySession s, SpyDestination dest) {
  +             session = s;
  +             destination = dest;
  +             messageListener = null;
  +             closed = false;
  +             selector = null;
  +             messageSelector = null;
  +             messages = new LinkedList();
  +             waitInReceive = false;
  +     }
  +
        // Public --------------------------------------------------------
  +
  +     public String getMessageSelector() throws JMSException {
  +             if (closed)
  +                     throw new IllegalStateException("The MessageConsumer is 
closed");
   
  -     public String getMessageSelector() throws JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
  -                             
                return messageSelector;
        }
   
  -     public MessageListener getMessageListener() throws JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
  -             
  +     public MessageListener getMessageListener() throws JMSException {
  +             if (closed)
  +                     throw new IllegalStateException("The MessageConsumer is 
closed");
  +
                return messageListener;
        }
  +
  +     public void setMessageListener(MessageListener listener) throws JMSException {
  +             if (closed)
  +                     throw new IllegalStateException("The MessageConsumer is 
closed");
  +             if (waitInReceive)
  +                     throw new JMSException("This MessageConsumer is waiting in 
receive() !");
  +
  +             messageListener = listener;
   
  -     public void setMessageListener(MessageListener listener) throws JMSException
  -     {       
  -             if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
  -             if (waitInReceive) throw new JMSException("This MessageConsumer is 
waiting in receive() !");
  -             //Job is done in the inherited classes
  -             //The QueueReceiver object need to notify their session / connection / 
the broker               
  -     }
  -
  -     public Message receive() throws JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
  -             if (messageListener!=null) throw new JMSException("A message listener 
is already registered");
  -             //Job is done in the inherited classes
  -             //The QueueReceiver object need to notify their session / connection / 
the broker
  -             return null;
  -     }
  -
  -     public Message receive(long timeOut) throws JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
  -             if (messageListener!=null) throw new JMSException("A message listener 
is already registered");
  -             //Job is done in the inherited classes
  -             //The QueueReceiver object need to notify their session / connection / 
the broker
  -             return null;
  -     }
  -
  -     public Message receiveNoWait() throws JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
  -             if (messageListener!=null) throw new JMSException("A message listener 
is already registered");
  -             //Job is done in the inherited classes
  -             //The QueueReceiver object need to notify their session / connection / 
the broker
  -             return null;
  -     }
  -
  -     public synchronized void close() throws JMSException
  -     {
  -             //Job is done in the inherited classes
  -             //The QueueReceiver object need to notify their session / connection / 
the broker
  -             throw new RuntimeException("pure virtual call");
  +             //session.run();                
        }
  -     
  +
  +     public Message receive() throws JMSException {
  +
  +             if (closed)
  +                     throw new IllegalStateException("The MessageConsumer is 
closed");
  +             if (messageListener != null)
  +                     throw new JMSException("A message listener is already 
registered");
  +             if (destination == null)
  +                     throw new JMSException("No assigned destination.");
  +
  +             waitInReceive = true;
  +
  +             if (!isListening() && this instanceof SpyQueueReceiver)
  +                     session.connection.queueReceive((SpyQueue) destination, 0);
  +
  +             synchronized (messages) {
  +
  +                     try {
  +                             while (true) {
  +                                     if (closed)
  +                                             return null;
  +                                     if (!session.modeStop) {
  +                                             Message mes = getMessage();
  +                                             if (mes != null)
  +                                                     return mes;
  +                                     } else
  +                                             Log.notice("the connection is stopped 
!");
  +
  +                                     Log.log("SpyMessageConsumer: receive in 
messages.wait()");
  +                                     messages.wait();
  +                             }
  +                     } catch (InterruptedException e) {
  +                             JMSException newE = new JMSException("Receive 
interupted");
  +                             newE.setLinkedException(e);
  +                             throw newE;
  +                     } finally {
  +                             waitInReceive = false;
  +                     }
  +             }
  +
  +     }
  +
  +     public Message receive(long timeOut) throws JMSException {
  +             if (closed)
  +                     throw new IllegalStateException("The MessageConsumer is 
closed");
  +             if (messageListener != null)
  +                     throw new JMSException("A message listener is already 
registered");
  +             if (destination == null)
  +                     throw new JMSException("No assigned destination.");
  +
  +             if (timeOut == 0)
  +                     return receive();
  +
  +             long endTime = System.currentTimeMillis() + timeOut;
  +
  +             waitInReceive = true;
  +
  +             if (!isListening() && this instanceof SpyQueueReceiver)
  +                     session.connection.queueReceive((SpyQueue) destination, 
timeOut);
  +
  +             synchronized (messages) {
  +
  +                     try {
  +
  +                             while (true) {
  +
  +                                     if (closed)
  +                                             return null;
  +
  +                                     if (!session.modeStop) {
  +                                             Message mes = getMessage();
  +                                             if (mes != null) {
  +                                                     return mes;
  +                                             }
  +
  +                                     } else
  +                                             Log.log("the connection is stopped !");
  +
  +                                     long att = endTime - 
System.currentTimeMillis();
  +                                     if (att <= 0) {
  +                                             return null;
  +                                     }
  +
  +                                     messages.wait(att);
  +                             }
  +
  +                     } catch (InterruptedException e) {
  +                             JMSException newE = new JMSException("Receive 
interupted");
  +                             newE.setLinkedException(e);
  +                             throw newE;
  +                     } finally {
  +                             waitInReceive = false;
  +                     }
  +             }
  +
  +     }
  +
  +     public Message receiveNoWait() throws JMSException {
  +             if (closed)
  +                     throw new IllegalStateException("The MessageConsumer is 
closed");
  +             if (messageListener != null)
  +                     throw new JMSException("A message listener is already 
registered");
  +             if (destination == null)
  +                     throw new JMSException("No assigned destination.");
  +
  +             waitInReceive = true;
  +             try {
  +
  +                     if (!isListening() && this instanceof SpyQueueReceiver) {
  +
  +                             if (session.modeStop)
  +                                     return null;
  +                             return session.connection.queueReceive((SpyQueue) 
destination, -1);
  +                     }
  +
  +                     synchronized (messages) {
  +                             while (true) {
  +                                     if (session.modeStop)
  +                                             return null;
  +                                     return getMessage();
  +                             }
  +                     }
  +
  +             } finally {
  +                     waitInReceive = false;
  +             }
  +     }
  +
  +     abstract public void close() throws JMSException;
  +
        //Package protected - Not part of the spec
  -     
  -     void setSelector(Selector selector,String messageSelector)
  -     {
  -             this.selector=selector;
  -             this.messageSelector=messageSelector;
  -     }
  -     
  -     SpyMessage getMessage()
  -     {
  +
  +     void setSelector(Selector selector, String messageSelector) {
  +             this.selector = selector;
  +             this.messageSelector = messageSelector;
  +     }
  +
  +     SpyMessage getMessage() {
                synchronized (messages) {
  -                             
  +
                        while (true) {
   
                                try {
  -                                     if (messages.size()==0) return null;
  -                             
  -                                     SpyMessage 
mes=(SpyMessage)messages.removeFirst();
  -                             
  +                                     if (messages.size() == 0)
  +                                             return null;
  +
  +                                     SpyMessage mes = (SpyMessage) 
messages.removeFirst();
  +
                                        if (mes.isOutdated()) {
                                                Log.notice("SessionQueue: I dropped a 
message (timeout)");
                                                continue;
                                        }
  -                                             
  -                                     if (selector!=null) {
  +
  +                                     if (selector != null) {
                                                if (!selector.test(mes)) {
                                                        Log.log("SessionQueue: I 
dropped a message (selector)");
                                                        continue;
  @@ -156,76 +247,100 @@
                                                        Log.log("SessionQueue: 
selector evaluates TRUE");
                                                }
                                        }
  -                                     
  +
                                        if (noLocal && 
mes.producerClientId.equals(session.connection.clientID)) {
                                                Log.notice("SessionQueue: I dropped a 
message (noLocal)");
                                                continue;
                                        }
  -                                             
  +
                                        //the SAME Message object is put in different 
SessionQueues
                                        //when we deliver it, we have to clone() it to 
insure independance
  -                                     SpyMessage message=mes.myClone();
  +                                     SpyMessage message = mes.myClone();
                                        message.setSpySession(session);
   
  -                                     if( session.transacted ) {
  +                                     if (session.transacted) {
                                                
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId, 
message);
  -                                     } else if( 
session.acknowledgeMode==session.AUTO_ACKNOWLEDGE || 
session.acknowledgeMode==session.DUPS_OK_ACKNOWLEDGE ) {
  +                                     } else if (session.acknowledgeMode == 
session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE) {
                                                message.doAcknowledge();
                                        }
   
                                        return message;
  -                                                                             
  +
                                } catch (Exception e) {
                                        e.printStackTrace();
                                }
   
                        }
  -                     
  +
                }
   
        }
  -     
  -     public void addMessage(SpyMessage mes) throws JMSException
  -     {
  -             //Set the session in the message so it can acknowlege
  -             mes.setSpySession(session);
  -                     
  +
  +     public void addMessage(SpyMessage mes) throws JMSException {
                synchronized (messages) {
                        //Add a message to the queue
  -                     messages.addLast(mes);                  
  +                     messages.addLast(mes);
                }
        }
  -     
  -     
  +
        public boolean deliverMessage() throws JMSException {
  -             
  +
                synchronized (messages) {
  -                     if (messages.size()==0) 
  +                     if (messages.size() == 0)
                                return false;
  -                                     
  -                     if (messageListener==null) {
  -                             if (!waitInReceive) 
  +
  +                     if (messageListener == null) {
  +                             if (!waitInReceive) {
  +
  +                                     // If no Listener and No reciver is waiting 
for a message
  +                                     // Then we neg ack the message back to the 
server in the queue case.
  +                                     if (this instanceof SpyQueueReceiver) {
  +
  +                                             SpyMessage mes = getMessage();
  +                                             while (mes == null) {
  +
  +                                                     Log.log("Got unrequested 
message, sending NACK for: " + mes);
  +                                                     SpyAcknowledgementItem item = 
new SpyAcknowledgementItem();
  +                                                     item.jmsDestination = 
mes.getJMSDestination();
  +                                                     item.jmsMessageID = 
mes.getJMSMessageID();
  +                                                     item.isAck = false;
  +
  +                                                     session.connection.send(item);
  +
  +                                                     mes = getMessage();
  +                                             }
  +
  +                                     }
                                        return false;
  +                             }
                                messages.notify();
  +
                        } else {
  -                             SpyMessage mes=getMessage();
  -                             if (mes==null) 
  +
  +                             SpyMessage mes = getMessage();
  +                             if (mes == null)
                                        return false;
  -                                     
  +
                                messageListener.onMessage(mes);
  -                             if( session.transacted ) {
  +                             if (session.transacted) {
                                        
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId, mes);
  -                             } else if( 
session.acknowledgeMode==session.AUTO_ACKNOWLEDGE || 
session.acknowledgeMode==session.DUPS_OK_ACKNOWLEDGE ) {
  +                             } else if (session.acknowledgeMode == 
session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE) {
                                        mes.doAcknowledge();
  -                             }       
  -                             
  -                     }               
  -             }               
  +                             }
  +
  +                     }
  +             }
  +
                return true;
  -     }       
  +     }
  +
  +     abstract public boolean isListening();
  +
  +     public boolean isReceiving() {
  +             return waitInReceive;
  +     }
   
  -             
  -     public boolean isListening() {
  -             return false;
  +     public void processMessages() throws JMSException {
  +             session.mutex.notifyLock();
        }
   }
  
  
  
  1.3       +4 -3      spyderMQ/src/java/org/spydermq/SpyQueueConnection.java
  
  Index: SpyQueueConnection.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueConnection.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- SpyQueueConnection.java   2000/12/12 05:58:58     1.2
  +++ SpyQueueConnection.java   2000/12/21 22:33:55     1.3
  @@ -13,7 +13,9 @@
   import javax.jms.ServerSessionPool;
   import javax.jms.TemporaryQueue;
   import javax.jms.Queue;
  +
   import java.io.Serializable;
  +
   import org.spydermq.distributed.interfaces.DistributedJMSServer;
   
   /**
  @@ -22,7 +24,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class SpyQueueConnection 
        extends SpyConnection 
  @@ -60,8 +62,7 @@
                if (closed) throw new IllegalStateException("The connection is 
closed");                
                if (distributedConnection==null) createReceiver();
                                                                                
  -             //Not impelemted yet
  -             return null;
  +             return new SpyConnectionConsumer(this, queue, messageSelector, 
sessionPool, maxMessages);
        }
   
   
  
  
  
  1.7       +33 -170   spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java
  
  Index: SpyQueueReceiver.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- SpyQueueReceiver.java     2000/12/19 06:43:34     1.6
  +++ SpyQueueReceiver.java     2000/12/21 22:33:55     1.7
  @@ -18,204 +18,67 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.6 $
  + *   @version $Revision: 1.7 $
    */
  -public class SpyQueueReceiver 
  -     extends SpyMessageConsumer 
  -     implements QueueReceiver
  -{
  +public class SpyQueueReceiver extends SpyMessageConsumer implements QueueReceiver {
        // Attributes ----------------------------------------------------
   
        //The queue I registered
        private Queue queue;
        //Mode of this QueueReceiver
        boolean listening;
  -     
  +
        // Constructor ---------------------------------------------------
  -        
  -     SpyQueueReceiver(SpyQueueSession session,Queue queue) 
  -     {
  -             super(session);
  -             this.destination=queue;
  -             this.queue=queue;
  -             listening=false;
  +
  +     SpyQueueReceiver(SpyQueueSession session, Queue queue) {
  +             super(session, (SpyQueue) queue);
  +             this.queue = queue;
  +             listening = false;
        }
   
        // Public --------------------------------------------------------
  +
  +     public Queue getQueue() throws JMSException {
  +             if (closed)
  +                     throw new IllegalStateException("The MessageConsumer is 
closed");
   
  -     public Queue getQueue() throws JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
  -                             
                return queue;
        }
   
  -     public void close() throws JMSException
  -     {       
  -             synchronized (messages) {
  -                     if (closed) return;
  -                     closed=true;
  +     public void close() throws JMSException {
   
  -                     setListening(false);
  -             }
  -     }
  +             synchronized (messages) {
  +                     if (closed)
  +                             return;
   
  -     //Overrides MessageConsumer
  -     public Message receive() throws JMSException {
  -             super.receive();
  +                     if (queue != null)
  +                             session.removeConsumer(queue, this);
   
  +                     setListening(false);
   
  -             //if the client follows the specification [4.4.6], he cannot use this 
session
  -             //to asynchronously receive a message or receive() in another thread.
  -             //If a message is already pending for this session, we can immediatly 
deliver it 
  -             synchronized (messages) {
  -                             
  -                     waitInReceive = true;                   
  -                     session.connection.queueReceive(queue, 0);
  -
  -                     try {
  -                     
  -                             while (true) {
  -
  -                                     if (!session.modeStop) {
  -
  -                                             Message mes = getMessage();
  -                                             if (mes != null) 
  -                                                     return mes;
  -
  -                                     } else
  -                                             Log.log("the connection is stopped !");
  -
  -                                     messages.wait();
  -                             }
  -                     
  -                     } catch (InterruptedException e) {
  -                             JMSException newE = new JMSException("Receive 
interupted");
  -                             newE.setLinkedException(e);
  -                             throw newE;
  -                     } finally {
  -                             waitInReceive=false;    
  -                     }
  -             }
  -     }
  +                     if (waitInReceive && messageListener == null) {
  +                             //A consumer could be waiting in receive()
  +                             messages.notify();
  +                     }
   
  -     public Message receive(long timeOut) throws JMSException
  -     {
  -             super.receive(timeOut);
  -                     
  -             if (timeOut==0) return receive();               
  -             long endTime=System.currentTimeMillis()+timeOut;
  -
  -             
  -             //if the client respects the specification [4.4.6], he cannot use this 
session
  -             //to asynchronously receive a message or receive() from another thread.
  -             //If a message is already pending for this session, we can deliver it 
  -             synchronized (messages) {
  -                     
  -                     waitInReceive=true;
  -                     session.connection.queueReceive(queue,timeOut);
  -                     
  -                     try {
  -                             
  -                             while (true) {
  -                                             
  -                                     if (!session.modeStop) {
  -                                             Message mes=getMessage();
  -                                             if (mes!=null) {
  -                                                     return mes;
  -                                             }
  -                                             
  -                                     } else 
  -                                             Log.log("the connection is stopped !");
  -                                     
  -                                     long att=endTime-System.currentTimeMillis();
  -                                     if (att<=0) {
  -                                             return null;                           
 
  -                                     }
  -                             
  -                                     messages.wait(att);
  -                             }
  -                             
  -                     } catch (InterruptedException e) {
  -                             JMSException newE = new JMSException("Receive 
interupted");
  -                             newE.setLinkedException(e);
  -                             throw newE;
  -                     } finally {
  -                             waitInReceive=false;    
  -                     }
  +                     closed = true;
                }
  -     
        }
  -
  -     public Message receiveNoWait() throws JMSException
  -     {
  -             super.receiveNoWait();          
  -             if (session.modeStop) return null;
  -             return session.connection.queueReceive(queue,-1);
  -     }       
   
  -     public void setMessageListener(MessageListener listener) throws JMSException
  -     {       
  +     public void setMessageListener(MessageListener listener) throws JMSException {
                super.setMessageListener(listener);
  -             
  -             messageListener=listener;
  -             setListening(listener!=null);
  +             setListening(listener != null);
        }
  -     
  -     //---   
  -     void setListening(boolean newvalue) throws JMSException
  -     {
  -             if (newvalue==listening) return;
  -             listening=newvalue;
  -             
  -             session.getConnection().listenerChange(queue);
  -     }
  -
  -     //Called by the ConnectionReceiver which has just received a message - in the 
Queue case only
  -     public void dispatchMessage(SpyMessage mes) throws JMSException {
   
  -             if (session.closed)
  -                     throw new NoReceiverException("The session is closed");
  -             if (session.modeStop)
  -                     throw new NoReceiverException("The session is stopped");
  -             if (mes.isOutdated())
  +     //---   
  +     void setListening(boolean newvalue) throws JMSException {
  +             if (newvalue == listening)
                        return;
  -                     
  -             //Work with this receiver
  -             if (messageListener == null) {
  -                     synchronized (messages) {
  -                             
  -                             if ( waitInReceive  ) {
  -                                     if( messages.size()==0 ) {
  -                                             addMessage(mes);
  -                                             messages.notify();
  -                                     } else {
  -                                             Log.notice("Got too many messages for 
one receive.!");
  -                                             throw new NoReceiverException("Got too 
many messages for one receive.!");
  -                                     }
  -                             } else {
  -                                     Log.notice("Message did not arrive in time for 
the receive!");
  -                                     throw new NoReceiverException("Message did not 
arrive in time for the receive!");                       
  -                             }
  -                                                     
  -                     }
  -             } else {
  -                     
  -                     if (!isListening())
  -                             throw new NoReceiverException("The receiver is not 
longer listening!");
  -                             
  -                     //Set the session in the message so it can acknowlege
  -                     mes.setSpySession(session);
  -                     messageListener.onMessage(mes);
  -
  -                     if( session.transacted ) {
  -                             
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId, mes);
  -                     } else if( session.acknowledgeMode==session.AUTO_ACKNOWLEDGE 
|| session.acknowledgeMode==session.DUPS_OK_ACKNOWLEDGE ) {
  -                             mes.doAcknowledge();
  -                     }       
  -             }
  -             
  -     }       
  +             listening = newvalue;
  +
  +             if (queue != null)
  +                     session.getConnection().listenerChange(queue);
  +     }
   
        public boolean isListening() {
                return listening;
  
  
  
  1.9       +9 -1      spyderMQ/src/java/org/spydermq/SpyQueueSession.java
  
  Index: SpyQueueSession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueSession.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- SpyQueueSession.java      2000/12/19 06:43:33     1.8
  +++ SpyQueueSession.java      2000/12/21 22:33:55     1.9
  @@ -22,19 +22,20 @@
   import java.util.Iterator;
   
   
  +import javax.jms.MessageListener;
  +
   /**
    *   This class implements javax.jms.QueueSession and javax.jms.XAQueueSession
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.8 $
  + *   @version $Revision: 1.9 $
    */
   public class SpyQueueSession 
        extends SpySession 
        implements QueueSession, XAQueueSession
   {
  -     
        // Constructor ---------------------------------------------------
           
        SpyQueueSession(SpyConnection myConnection, boolean transacted, int 
acknowledgeMode, boolean stop, boolean xaSession)
  @@ -111,4 +112,11 @@
                return this;
        }
   
  +     public void setMessageListener(MessageListener listener) throws JMSException
  +     {
  +             
  +             super.setMessageListener(listener);
  +             sessionConsumer = new SpyQueueReceiver(this, null);
  +
  +     }
   }
  
  
  
  1.16      +119 -128  spyderMQ/src/java/org/spydermq/SpySession.java
  
  Index: SpySession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpySession.java,v
  retrieving revision 1.15
  retrieving revision 1.16
  diff -u -r1.15 -r1.16
  --- SpySession.java   2000/12/19 06:43:33     1.15
  +++ SpySession.java   2000/12/21 22:33:55     1.16
  @@ -34,9 +34,9 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.15 $
  + *   @version $Revision: 1.16 $
    */
  -public class SpySession 
  +abstract public class SpySession 
        implements Runnable, Session, XASession
   {    
        // Attributes ----------------------------------------------------
  @@ -49,16 +49,21 @@
        private MessageListener messageListener;
        //The connection object to which this session is linked
        protected SpyConnection connection;
  +     // This consumer is the consumer that receives messages for the MessageListener
  +     // assigned to the session.  The SpyConnectionConsumer delivers messages to him
  +     SpyMessageConsumer sessionConsumer;
  +     //MessageConsumers created by this session
  +     protected HashSet consumers;
   
        //Is my connection in stopped mode ?
        protected boolean modeStop;
        //Is the session closed ?
        boolean closed;
  -     //This object is the object used to synchronize the session's thread - Need 
fixed / improvement
  -     public Mutex mutex;
  -     //MessageConsumers created by this session
  -     protected HashSet consumers;
  -     
  +     // Used to notify the session thread to deliver messages
  +     Mutex mutex;
  +     // Used to lock the run() method
  +     Object runLock=new Object();
  +             
        //The transctionId of the current transaction (registed with the 
SpyXAResourceManager)
        Object currentTransactionId;
        // If this is an XASession, we have an associated XAResource
  @@ -76,26 +81,34 @@
                if( xaSession )
                        spyXAResource = new SpyXAResource(this);
   
  +             mutex = new Mutex();
                messageListener=null;
                closed=false;
  -             mutex=new Mutex();
                consumers = new HashSet();
  -             
  -             //Start my thread 
  -             Thread oneThread=new Thread(this, "SpySession");
  -             oneThread.setDaemon(true);
  -             oneThread.start();
  -             
  -             //Wait for the thread to sleep
  -             mutex.waitLocked();
  -
  +                             
                //Have a TX ready with the resource manager.
                if( spyXAResource==null && transacted )
                        currentTransactionId = 
connection.spyXAResourceManager.startTx();
                        
  +             //Start my thread 
  +             Thread oneThread=new Thread("SpySession") {
  +                     public void run() {
  +                             mutex.acquireLock();
  +                             while( !closed ) {
  +                                     SpySession.this.run();
  +                                     mutex.waitLock();
  +                             }
  +                             mutex.releaseLock();
  +                     }
  +             };
  +             oneThread.setDaemon(true);
  +             oneThread.start();
  +
  +             mutex.waitLocked();             
        }
   
   
  +
        // Public --------------------------------------------------------
        
        public BytesMessage createBytesMessage() throws JMSException
  @@ -189,46 +202,62 @@
   
        public void setMessageListener(MessageListener listener) throws JMSException
        {
  -             if (closed) throw new IllegalStateException("The session is closed");  
         
  -                                                                      
  -             messageListener=listener;
  +             if (closed) throw new IllegalStateException("The session is closed");  
                                          
  +             messageListener = listener;
  +
  +             mutex.notifyLock();
        }
        
  -     //The thread for this session. It sends outgoing messages and delivers 
incoming ones
  +     // Delivers incoming messages in this session
        public void run()
  -     {               
  -             mutex.acquireLock();
  -             
  -             while (true) {
  +     {
  +             synchronized (runLock) {
  +                             
  +                     Log.log("SpySession: Message delivery started");
  +                     
  +                     boolean done=false;     
  +                     while (!done) {
  +                             if (closed) break;
   
  -                     boolean doneJob=false;
  -                     if (closed) break;
  -                                                             
  -                     try {   
  -                             //if we are not in stopped mode, look at the incoming 
queue
  -                             if (!modeStop) {
  -                                     Iterator i=consumers.iterator();
  -                                     while (i.hasNext()) {
  -                                             SpyMessageConsumer 
mc=(SpyMessageConsumer)i.next();
  -                                             doneJob=doneJob||mc.deliverMessage();
  -                                     }                                       
  +                             // If we can deliver any messages, then we are not 
done.
  +                             done = true;
  +                             
  +                             try {
  +                                     
  +                                     //if we are not in stopped mode, look at the 
incoming queue
  +                                     //Consisder if should be stopped because we 
are outside the XA transaction (start/end)
  +                                     boolean xaStop = spyXAResource!=null && 
currentTransactionId==null;
  +                                     if (!(modeStop || xaStop)) {
  +                                             Iterator i;
  +                                             
  +                                             synchronized (consumers) {
  +                                                     i = 
((HashSet)consumers.clone()).iterator();
  +                                             }
  +                                             
  +                                             while (i.hasNext()) {
  +                                                     SpyMessageConsumer 
mc=(SpyMessageConsumer)i.next();
  +                                                     if( mc.deliverMessage() ) 
  +                                                             done = false;
  +                                             }
  +                                             
  +                                             synchronized (consumers) {
  +                                                     if( sessionConsumer != null ) {
  +                                                             if( 
sessionConsumer.deliverMessage() )
  +                                                                     done = false;
  +                                                     }
  +                                             }
  +                                             
  +                                     }
  +                             } catch (JMSException e) {
  +                                     Log.log("Cannot receive a message from the 
provider...");
  +                                     Log.error(e);
                                }
  -                     } catch (JMSException e) {
  -                             Log.log("Cannot receive a message from the 
provider...");
  -                             Log.error(e);
  +                             
                        }
  -                                     
  -                     //If there was smthg to do, try again
  -                     if (doneJob) continue;
  -                                     
  -                     //Log.log("SessionThread: I'm going to bed...");
  -                     mutex.waitLock();
  -                     //Log.log("SessionThread: I wake up");
  -
  +             
                }
  -                     
  -             mutex.releaseLock();
                
  +             Log.log("SpySession: Message delivery ended");
        }
   
        public synchronized void close() throws JMSException
  @@ -236,20 +265,24 @@
                // allow other threads to process before closing this session
                // Patch submitted by John Ellis (10/29/00)
                Thread.yield();
  -                             
  -             if (closed) return;
  -             closed=true;
   
  -             //if the thread is sleeping, kill it            
  -             mutex.notifyLock();
  -             mutex.waitToSleep();
  -             
  -             //notify the sleeping synchronous listeners
  -             
  -             Iterator i=consumers.iterator();
  -             while (i.hasNext()) {
  -                     SpyMessageConsumer 
messageConsumer=(SpyMessageConsumer)i.next();
  -                     messageConsumer.close();
  +             synchronized (runLock) {
  +                     if (closed) return;
  +                     closed=true;                    
  +             }
  +
  +             synchronized (consumers) {
  +                     
  +                     //notify the sleeping synchronous listeners
  +                     if ( sessionConsumer != null )
  +                             sessionConsumer.close();
  +                             
  +                     Iterator i=consumers.iterator();
  +                     while (i.hasNext()) {
  +                             SpyMessageConsumer 
messageConsumer=(SpyMessageConsumer)i.next();
  +                             messageConsumer.close();
  +                     }
  +                     
                }       
                
                connection.sessionClosing(this);
  @@ -263,17 +296,12 @@
                if (spyXAResource!=null) throw new 
javax.jms.TransactionInProgressException("Should not be call from a XASession");
                if (closed) throw new IllegalStateException("The session is closed");  
         
                if (!transacted) throw new IllegalStateException("The session is not 
transacted");
  -
                        
                Log.log("Session: commit()");
   
  -             boolean modeSav=modeStop;
  -             modeStop=true;
  -             
  -             //Wait for the thread to sleep
  -             synchronized (mutex) {                  
  -                     mutex.waitToSleep();
  -
  +             //Don't deliver any more messages while commiting
  +             synchronized (runLock) {
  +                     
                        // commit transaction with onePhase commit
                        try {
                                
connection.spyXAResourceManager.commit(currentTransactionId, true);
  @@ -288,9 +316,6 @@
                                } catch ( Exception ignore ) {}
                        }                                               
                                                                                
  -                     //We have finished our work, we can wake up the thread
  -                     modeStop=modeSav;
  -                     mutex.notifyLock();                     
                }
                
        }
  @@ -303,15 +328,10 @@
                if (!transacted) throw new IllegalStateException("The session is not 
transacted");
                                                                         
                Log.log("Session: rollback()");
  -
  -             boolean modeSav=modeStop;
  -             modeStop=true;
  -             
  -             //Wait for the thread to sleep
  -             synchronized (mutex) {
  +     
  +             // Stop message delivery
  +             synchronized (runLock) {
                        
  -                     mutex.waitToSleep();
  -
                        // rollback transaction
                        try {
                                
connection.spyXAResourceManager.rollback(currentTransactionId);
  @@ -324,11 +344,8 @@
                                try {
                                        currentTransactionId = 
connection.spyXAResourceManager.startTx();
                                } catch ( Exception ignore ) {}
  -                     }                                               
  -                                             
  -                     //We have finished our work, we can wake up the thread
  -                     modeStop=modeSav;
  -                     mutex.notifyLock();
  +                     }
  +                     
                }
        }
   
  @@ -365,27 +382,17 @@
                return connection.getNewMessageID();
        }
        
  -     //The connection has changed its mode (stop() or start())
  -     //We have to wait until message delivery has stopped or wake up the thread
  -     void notifyStopMode(boolean newValue)
  -     {
  -             
  -             if (closed) throw new IllegalStateException("The session is closed");  
                                                                  
  -             if (modeStop==newValue) return; 
  -             
  -             modeStop=newValue;
   
  -             notifyStopMode();
  -             
  -     }
                
        void removeConsumer(Destination dest, SpyMessageConsumer who) throws 
JMSException
        {
                Log.log("Session: 
removeConsumer(Destination="+dest.toString()+",MessageConsumer="+who.toString()+")");
  +
  +             synchronized (connection) {                     
  +                     connection.removeConsumer(dest, who );          
  +             }
                
                consumers.remove( who );
  -             
  -             connection.removeConsumer(dest, who );          
        }
        
        void addConsumer(Destination dest, SpyMessageConsumer who) throws JMSException
  @@ -393,22 +400,20 @@
                if (closed) throw new IllegalStateException("The session is closed");
                                                                                
                Log.log("Session: 
subscribe(dest="+dest.toString()+",MessageConsumer="+who.toString()+")");
  -             connection.addConsumer(dest, who);
  +
  +             synchronized (consumers) {
  +                     consumers.add( who );                   
  +             }
   
  -             consumers.add( who );
  +             connection.addConsumer(dest, who);
                
        }
   
        
  -     /**
  -      * @return org.spydermq.SpyConnection
  -      */
        public SpyConnection getConnection() {
                return connection;
        }
  -     
   
  -
        
        //called by a MessageProducer object which needs to publish a message
        void sendMessage(SpyMessage m) throws JMSException {
  @@ -423,35 +428,21 @@
                
        }
   
  -     
   
  -     /**
  -      * getXAResource method comment.
  -      */
        public javax.transaction.xa.XAResource getXAResource() {
  -             return null;
  +             return spyXAResource;
        }
   
  -     // The XA transaction state may have changed... either it started or ended
  -     // We have to wait until message delivery has stopped or wake up the thread
  -     void notifyStopMode()
  +     //The connection has changed its mode (stop() or start())
  +     //We have to wait until message delivery has stopped or wake up the thread
  +     void setStopMode(boolean newValue)
        {
                
  -             // Should it be stopped because we are outside the XA transaction 
(start/end)
  -             boolean xaStop = spyXAResource!=null && currentTransactionId==null;
  +             if (closed) throw new IllegalStateException("The session is closed");  
                                                                  
  +             if (modeStop==newValue) return; 
                
  -             if ( modeStop || xaStop ) {
  -                     
  -                     //Wait for the thread to sleep
  -                     mutex.waitToSleep();
  -                                             
  -             } else {
  -                     
  -                     //Wake up the thread
  -                     mutex.notifyLock();
  -                     
  -             }
  +             modeStop=newValue;
                
  +             mutex.notifyLock();
        }
  -     
   }
  
  
  
  1.11      +10 -0     spyderMQ/src/java/org/spydermq/SpyTopicSession.java
  
  Index: SpyTopicSession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSession.java,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -r1.10 -r1.11
  --- SpyTopicSession.java      2000/12/19 06:43:33     1.10
  +++ SpyTopicSession.java      2000/12/21 22:33:55     1.11
  @@ -27,13 +27,15 @@
   import org.spydermq.Log;
   
   
  +import javax.jms.MessageListener;
  +
   /**
    *   This class implements javax.jms.TopicSession and javax.jms.XATopicSession
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.10 $
  + *   @version $Revision: 1.11 $
    */
   public class SpyTopicSession 
        extends SpySession 
  @@ -122,5 +124,13 @@
         */
        public javax.jms.TopicSession getTopicSession() throws javax.jms.JMSException {
                return this;
  +     }
  +
  +     public void setMessageListener(MessageListener listener) throws JMSException
  +     {
  +             
  +             super.setMessageListener(listener);
  +             sessionConsumer = new SpyTopicSubscriber(this, null, false);
  +
        }
   }
  
  
  
  1.8       +17 -104   spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java
  
  Index: SpyTopicSubscriber.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- SpyTopicSubscriber.java   2000/12/13 15:59:10     1.7
  +++ SpyTopicSubscriber.java   2000/12/21 22:33:56     1.8
  @@ -20,7 +20,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.7 $
  + *   @version $Revision: 1.8 $
    */
   public class SpyTopicSubscriber 
        extends SpyMessageConsumer 
  @@ -36,8 +36,7 @@
           
        SpyTopicSubscriber(SpyTopicSession session,Topic topic,boolean noLocal) 
        {
  -             super(session);
  -             destination=topic;
  +             super(session, (SpyTopic)topic);
                this.topic=topic;
                this.noLocal=noLocal;
        }
  @@ -59,115 +58,29 @@
        //Overrides MessageConsumer
   
        public void close() throws JMSException
  -     {               
  -             if (closed) return;
  -             closed=true;
  -
  -             session.removeConsumer(topic,this);
  -             
  -             if (waitInReceive&&messageListener==null) {
  -                     
  -                     //A consumer could be waiting in receive()
  -                     synchronized (messages) {
  -                             messages.notify();
  -                     }
  -                     
  -             }
  -     }
  -             
  -     public Message receive() throws JMSException
        {
  -             super.receive();
  -             
  -             synchronized (messages) {
  -                     
  -                     //if the client follows the specification [4.4.6], he cannot 
use this session
  -                     //to asynchronously receive a message or receive() in another 
thread.
  -                     //If a message is already pending for this session, we can 
immediatly deliver it 
  -                     
  -                     while (true) {
  -                             
  -                             if (closed) return null;
  -                             
  -                             if (!session.modeStop) {
  -                                     Message mes=getMessage();
  -                                     if (mes!=null) return mes;
  -                             } else Log.notice("the connection is stopped !");
  -                             
  -                             try {
  -                                     waitInReceive=true;
  -                                     messages.wait();
  -                             } catch (InterruptedException e) {
  -                             } finally {
  -                                     waitInReceive=false;
  -                             }
  -                             
  -                     }
  -             }
  -     }
   
  -     public Message receive(long timeOut) throws JMSException
  -     {
  -             super.receive(timeOut);
  -             
  -             if (timeOut==0) return receive();
  -             
  -             long endTime=System.currentTimeMillis()+timeOut;
  -             
                synchronized (messages) {
  -                     
  -                     //if the client respects the specification [4.4.6], he cannot 
use this session
  -                     //to asynchronously receive a message or receive() from 
another thread.
  -                     //If a message is already pending for this session, we can 
deliver it 
  +                     if (closed)
  +                             return;
  +
  +                     if (topic != null)
  +                             session.removeConsumer(topic, this);
                                                
  -                     while (true) {
  -                             
  -                             if (closed) return null;
  -                             
  -                             if (!session.modeStop) {
  -                                     Message mes=getMessage();
  -                                     if (mes!=null) return mes;
  -                             } else Log.notice("the connection is stopped !");
  -                             
  -                             long att=endTime-System.currentTimeMillis();
  -                             if (att<=0) return null;
  -                             
  -                             try {                                   
  -                                     waitInReceive=true;
  -                                     messages.wait(att);
  -                             } catch (InterruptedException e) {
  -                             } finally {
  -                                     waitInReceive=false;
  -                             }
  -                             
  +                     if (waitInReceive && messageListener == null) {
  +                             //A consumer could be waiting in receive()
  +                             messages.notify();
                        }
  -             }
  -             
  -     }
  -
  -     public Message receiveNoWait() throws JMSException
  -     {
  -             super.receiveNoWait();
  -             
  -             synchronized (messages) {
                        
  -                     while (true) {
  -                             if (session.modeStop) return null;
  -                             return getMessage();
  -                     }
  -                                             
  +                     closed = true;
                }
  -     }
  -
  -     public void setMessageListener(MessageListener listener) throws JMSException
  -     {       
  -             super.setMessageListener(listener);
  -             
  -             messageListener=listener;
                
  -             //Signal the change to the session thread ( it could sleep, while 
there are messages for him )
  -             session.mutex.notifyLock();
  -             
        }
        
  +     /**
  +      * A topic is allways accepting messages from a destination.
  +      */
  +     public boolean isListening() {
  +             return true;
  +     }
   }
  
  
  
  1.2       +45 -36    spyderMQ/src/java/org/spydermq/SpyXAResource.java
  
  Index: SpyXAResource.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyXAResource.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- SpyXAResource.java        2000/12/19 06:43:32     1.1
  +++ SpyXAResource.java        2000/12/21 22:33:56     1.2
  @@ -16,10 +16,10 @@
    *      
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public class SpyXAResource implements XAResource {
  -     
  +
        //////////////////////////////////////////////////////////////////
        // Attributes
        //////////////////////////////////////////////////////////////////
  @@ -37,7 +37,7 @@
        //////////////////////////////////////////////////////////////////
        // Public Methods
        //////////////////////////////////////////////////////////////////
  -     
  +
        /**
         * commit method comment.
         */
  @@ -48,7 +48,7 @@
                        throw new XAException(XAException.XAER_RMERR);
                }
        }
  -     
  +
        /**
         * end method comment.
         */
  @@ -56,36 +56,39 @@
                if (session.currentTransactionId == null) {
                        throw new XAException(XAException.XAER_OUTSIDE);
                }
  -             switch (flags) {
  -                     case TMSUSPEND :
  -                             session.currentTransactionId = null;
  -                             session.connection.spyXAResourceManager.suspendTx(xid);
  -                             break;
  -                     case TMFAIL :
  -                             session.currentTransactionId = null;
  -                             session.connection.spyXAResourceManager.endTx(xid, 
false);
  -                             break;
  -                     case TMSUCCESS :
  -                             session.currentTransactionId = null;
  -                             session.connection.spyXAResourceManager.endTx(xid, 
true);
  -                             break;
  +
  +             synchronized (session.runLock) {
  +
  +                     switch (flags) {
  +                             case TMSUSPEND :
  +                                     session.currentTransactionId = null;
  +                                     
session.connection.spyXAResourceManager.suspendTx(xid);
  +                                     break;
  +                             case TMFAIL :
  +                                     session.currentTransactionId = null;
  +                                     
session.connection.spyXAResourceManager.endTx(xid, false);
  +                                     break;
  +                             case TMSUCCESS :
  +                                     session.currentTransactionId = null;
  +                                     
session.connection.spyXAResourceManager.endTx(xid, true);
  +                                     break;
  +                     }
                }
  -             session.notifyStopMode();
        }
  -     
  +
        /**
         * forget method comment.
         */
        public void forget(javax.transaction.xa.Xid arg1) throws 
javax.transaction.xa.XAException {
        }
  -     
  +
        /**
         * getTransactionTimeout method comment.
         */
        public int getTransactionTimeout() throws javax.transaction.xa.XAException {
                return 0;
        }
  -     
  +
        /**
         * isSameRM method comment.
         */
  @@ -94,7 +97,7 @@
                        return false;
                return ((SpyXAResource) arg1).session.connection.spyXAResourceManager 
== session.connection.spyXAResourceManager;
        }
  -     
  +
        /**
         * prepare method comment.
         */
  @@ -105,14 +108,14 @@
                        throw new XAException(XAException.XAER_RMERR);
                }
        }
  -     
  +
        /**
         * recover method comment.
         */
        public Xid[] recover(int arg1) throws javax.transaction.xa.XAException {
                return new Xid[0];
        }
  -     
  +
        /**
         * rollback method comment.
         */
  @@ -123,14 +126,14 @@
                        throw new XAException(XAException.XAER_RMERR);
                }
        }
  -     
  +
        /**
         * setTransactionTimeout method comment.
         */
        public boolean setTransactionTimeout(int arg1) throws 
javax.transaction.xa.XAException {
                return false;
        }
  -     
  +
        /**
         * start method comment.
         */
  @@ -138,17 +141,23 @@
                if (session.currentTransactionId != null) {
                        throw new XAException(XAException.XAER_OUTSIDE);
                }
  -             switch (flags) {
  -                     case TMNOFLAGS :
  -                             session.currentTransactionId = 
session.connection.spyXAResourceManager.startTx(xid);
  -                             break;
  -                     case TMJOIN :
  -                             session.currentTransactionId = 
session.connection.spyXAResourceManager.joinTx(xid);
  -                             break;
  -                     case TMRESUME :
  -                             session.currentTransactionId = 
session.connection.spyXAResourceManager.resumeTx(xid);
  -                             break;
  +
  +             synchronized (session.runLock) {
  +
  +                     switch (flags) {
  +                             case TMNOFLAGS :
  +                                     session.currentTransactionId = 
session.connection.spyXAResourceManager.startTx(xid);
  +                                     break;
  +                             case TMJOIN :
  +                                     session.currentTransactionId = 
session.connection.spyXAResourceManager.joinTx(xid);
  +                                     break;
  +                             case TMRESUME :
  +                                     session.currentTransactionId = 
session.connection.spyXAResourceManager.resumeTx(xid);
  +                                     break;
  +                     }
  +                     session.runLock.notify();
  +
                }
  -             session.notifyStopMode();
  +
        }
   }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyConnectionConsumer.java
  
  Index: SpyConnectionConsumer.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.ServerSessionPool;
  import javax.jms.Destination;
  import javax.jms.JMSException;
  import javax.jms.ServerSession;
  
  import java.util.Iterator;
  import java.util.LinkedList;
  
  /**
   *    This class implements javax.jms.ConnectionConsumer
   *      
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyConnectionConsumer implements javax.jms.ConnectionConsumer, 
SpyConsumer {
  
        // The connection is the consumer was created with
        SpyConnection connection;
        // The destination this consumer will receive messages from
        Destination destination;
        // The ServerSessionPool that is implemented by the AS
        javax.jms.ServerSessionPool serverSessionPool;
        // The maximum number of messages that a single session will be loaded with.
        int maxMessages;
        // This queue will hold messages until they are dispatched to the 
MessageListener       
        LinkedList queue = new LinkedList();
        // Is the ConnectionConsumer closed?
        boolean closed;
  
        /**
         * SpyConnectionConsumer constructor comment.
         */
        public SpyConnectionConsumer(SpyConnection connection, Destination 
destination, String messageSelector, ServerSessionPool serverSessionPool, int 
maxMessages)
                throws JMSException {
  
                this.connection = connection;
                this.destination = destination;
                this.serverSessionPool = serverSessionPool;
                this.maxMessages = maxMessages;
  
                connection.addConsumer(destination, this);
  
        }
  
        public void addMessage(SpyMessage mes) throws JMSException {
                queue.addLast(mes);
        }
  
        /**
         * close method comment.
         */
        public void close() throws javax.jms.JMSException {
  
                connection.removeConsumer(destination, this);
                closed = true;
  
        }
        
        /**
         * getServerSessionPool method comment.
         */
        public javax.jms.ServerSessionPool getServerSessionPool() throws 
javax.jms.JMSException {
                return serverSessionPool;
        }
        
        public boolean isListening() {
                return true;
        }
        
        public boolean isReceiving() {
                return false;
        }
        
        public void processMessages() throws JMSException {
  
                ServerSession serverSession = serverSessionPool.getServerSession();
                SpySession spySession = (SpySession) serverSession.getSession();
                if (spySession.sessionConsumer == null)
                        throw new JMSException("Session did not have a set 
MessageListner");
  
                int loadCounter = 0;
  
                Iterator iter = queue.iterator();
                while (iter.hasNext()) {
  
                        loadCounter++;
                        SpyMessage message = (SpyMessage) iter.next();
                        spySession.sessionConsumer.addMessage(message);
  
                        if (loadCounter >= maxMessages) {
  
                                serverSession.start();
  
                                serverSession = serverSessionPool.getServerSession();
                                spySession = (SpySession) serverSession.getSession();
  
                                if (spySession.sessionConsumer == null)
                                        throw new JMSException("Session did not have a 
set MessageListner");
                        }
  
                }
  
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyConsumer.java
  
  Index: SpyConsumer.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.JMSException;
  
  /**
   *    This class defines the interface which is used by the ConnectionReceiver to
   *  send messages to the consumers.
   *      
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public interface SpyConsumer 
  {
        // A ConnectionReceiver uses this method to load a Consumer with a message
        public void addMessage(SpyMessage mes) throws JMSException;
        // This is used the Connection class (it maintains a list of consumers) to see 
who is receiving messages
        public boolean isListening();
        // This is used the Connection class (it maintains a list of consumers) to see 
who is receiving messages
        public boolean isReceiving();
        // This is called by a ConnectionReceiver after it is finished loading 
messages into the consumer.
        public void processMessages() throws JMSException;
  }
  
  
  

Reply via email to