User: hiram   
  Date: 00/12/23 07:48:23

  Modified:    src/java/org/spydermq/distributed/server
                        ConnectionReceiverOIL.java
                        ConnectionReceiverOILClient.java
                        ConnectionReceiverRMI.java
                        ConnectionReceiverRMIImpl.java
                        ConnectionReceiverUIL.java
                        ConnectionReceiverUILClient.java
                        DistributedJMSServerOIL.java
                        DistributedJMSServerOILClient.java
                        DistributedJMSServerRMI.java
                        DistributedJMSServerRMIImpl.java
                        DistributedJMSServerUIL.java
                        DistributedJMSServerUILClient.java
  Log:
  These changes were done to add the following features:
  The selector is now evaluated at the server side.
  The infrastructure has been laid for durable topic subscriptions.
  The QueueBrowser has been implemented.
  Queues now can have a Selector.
  
  Revision  Changes    Path
  1.13      +11 -113   
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java
  
  Index: ConnectionReceiverOIL.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- ConnectionReceiverOIL.java        2000/12/21 22:33:58     1.12
  +++ ConnectionReceiverOIL.java        2000/12/23 15:48:20     1.13
  @@ -42,7 +42,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.12 $
  + *   @version $Revision: 1.13 $
    */
   public class ConnectionReceiverOIL implements Runnable, ConnectionReceiverSetup {
        // Attributes ----------------------------------------------------
  @@ -51,23 +51,19 @@
        private SpyConnection connection;
        //Is my connection closed ?
        private boolean closed;
  +     private ServerSocket serverSocket;
   
  -     // Constructor ---------------------------------------------------
  +     // Internals -----------------------------------------------------
  +     static final int m_close = 2;
  +     static final int m_deleteTemporaryDestination = 1;
  +     static final int m_receive = 3;
   
  +     // Constructor ---------------------------------------------------      
        public ConnectionReceiverOIL() {
                closed = false;
                exportObject();
        }
   
  -     // Internals -----------------------------------------------------
  -
  -     static final int RECEIVE = 1;
  -     static final int RECEIVE_MULTIPLE = 2;
  -     static final int DELETE_TEMPORARY_DESTINATION = 3;
  -     static final int CLOSE = 4;
  -
  -     private ServerSocket serverSocket;
  -
        void exportObject() {
                try {
                        serverSocket = new ServerSocket(0);
  @@ -110,18 +106,13 @@
                        try {
   
                                switch (code) {
  -                                     case RECEIVE :
  -                                             receive((SpyDestination) 
in.readObject(), (SpyMessage) in.readObject());
  -                                             break;
  -                                     case RECEIVE_MULTIPLE :
  -                                             SpyDestination dest = (SpyDestination) 
in.readObject();
  -                                             int nb = in.readInt();
  -                                             receiveMultiple(dest, nb, in);
  +                                     case m_receive:
  +                                             
connection.deliver((org.spydermq.ReceiveRequest[])in.readObject());
                                                break;
  -                                     case DELETE_TEMPORARY_DESTINATION :
  -                                             
deleteTemporaryDestination((SpyDestination) in.readObject());
  +                                     case m_deleteTemporaryDestination:
  +                                             
connection.deleteTemporaryDestination((SpyDestination) in.readObject());
                                                break;
  -                                     case CLOSE :
  +                                     case m_close :
                                                close();
                                                break;
                                        default :
  @@ -164,8 +155,6 @@
                Log.error(e);
        }
   
  -     // Public --------------------------------------------------------      
  -
        public void setConnection(SpyConnection connection) {
                this.connection = connection;
        }
  @@ -174,100 +163,9 @@
                return new ConnectionReceiverOILClient(InetAddress.getLocalHost(), 
serverSocket.getLocalPort());
        }
   
  -     // ---
  -
  -     //<DEBUG>
  -
  -     /*public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
  -     {
  -             connection.rec++;
  -     }
  -     
  -     public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws 
JMSException
  -     {
  -             connection.rec++;
  -     }*/
  -
  -     //</DEBUG>
  -
  -     //A message has arrived for this Connection, We have to dispatch it to the 
sessions
  -     public void receive(SpyDestination dest, SpyMessage mes) throws JMSException {
  -             if (closed)
  -                     throw new IllegalStateException("The connection is closed");
  -
  -             Log.log("ConnectionReceiver: Receive(Destination=" + dest.toString() + 
",Mes=" + mes.toString() + ")");
  -
  -             if (connection instanceof SpyTopicConnection) {
  -
  -                     //Get the set of subscribers for this Topic
  -                     SpyConsumer consumers[] = connection.getConsumers(dest);
  -
  -                     for (int i = 0; i < consumers.length; i++) {
  -
  -                             //add the new message to the consumer's queue
  -                             consumers[i].addMessage(mes);
  -
  -                             //There is work to do... 
  -                             consumers[i].processMessages();
  -                     }
  -
  -             } else {
  -
  -                     //Find one session waiting for this Queue
  -                     if (connection.modeStop)
  -                             throw new JMSException("This connection is stopped !");
  -
  -                     SpyConsumer consumer = connection.pickListeningConsumer(dest);
  -                     if (consumer == null)
  -                             throw new NoReceiverException("There are no listening 
sessions for this destination !");
  -
  -                     //Try with this sessionQueue
  -                     Log.log("Dispatching to SessionQueue: " + mes);
  -                     consumer.addMessage(mes);
  -                     consumer.processMessages();
  -
  -             }
  -
  -     }
  -
  -     public void receiveMultiple(SpyDestination dest, int nb, ObjectInputStream in) 
throws Exception {
  -             if (closed)
  -                     throw new IllegalStateException("The connection is closed");
  -
  -             Log.log("ConnectionReceiver: ReceiveMultiple()");
  -
  -             if (connection instanceof SpyTopicConnection) {
  -
  -                     //Get the set of subscribers for this Topic
  -                     SpyConsumer consumers[] = connection.getConsumers(dest);
  -
  -                     for (int val = 0; val < nb; val++) {
  -                             SpyMessage mes = (SpyMessage) in.readObject();
  -
  -                             for (int i = 0; i < consumers.length; i++) {
  -
  -                                     //add the new message to the consumer's queue
  -                                     consumers[i].addMessage(mes);
  -
  -                                     //There is work to do... 
  -                                     consumers[i].processMessages();
  -                             }
  -                     }
  -             } else {
  -                     throw new Exception("Multiple dispatch for a Queue");
  -             }
  -     }
  -
        public void close() throws Exception {
                closed = true;
        }
   
  -     //One TemporaryDestination has been deleted
  -     public void deleteTemporaryDestination(SpyDestination dest) throws 
JMSException {
  -             if (closed)
  -                     throw new IllegalStateException("The connection is closed");
  -
  -             connection.deleteTemporaryDestination(dest);
  -     }
   
   }
  
  
  
  1.13      +17 -28    
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java
  
  Index: ConnectionReceiverOILClient.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- ConnectionReceiverOILClient.java  2000/12/12 05:58:50     1.12
  +++ ConnectionReceiverOILClient.java  2000/12/23 15:48:20     1.13
  @@ -25,28 +25,29 @@
   import java.net.InetAddress;
   import java.net.SocketException;
   
  +import org.spydermq.ReceiveRequest;
  +
   /**
    *   The UIL implementation of the ConnectionReceiver object
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.12 $
  + *   @version $Revision: 1.13 $
    */
   public class ConnectionReceiverOILClient
        implements ConnectionReceiver, Serializable
   {
  -     static final int RECEIVE = 1;
  -     static final int RECEIVE_MULTIPLE = 2;
  -     static final int DELETE_TEMPORARY_DESTINATION = 3;
  -     static final int CLOSE = 4;             
  -     
  +
        private transient Socket socket;
        private transient ObjectOutputStream out;
        private transient ObjectInputStream in;
  -
        private int port;
        private InetAddress addr;
  +
  +     static final int m_close = 2;
  +     static final int m_deleteTemporaryDestination = 1;
  +     static final int m_receive = 3;
        
        public ConnectionReceiverOILClient(InetAddress addr,int port)
        {
  @@ -94,30 +95,11 @@
                        throw throwException;
        }
        
  -     public void receive(SpyDestination dest,SpyMessage mes) throws Exception
  -     {               
  -             checkSocket();
  -             out.writeByte(RECEIVE);
  -             out.writeObject(dest);
  -             out.writeObject(mes);
  -             waitAnswer();
  -     }
  -             
  -     public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws 
Exception      
  -     {
  -             checkSocket();
  -             out.writeByte(RECEIVE_MULTIPLE);
  -             out.writeObject(dest);
  -             out.writeInt(mes.length);
  -             for(int i=0;i<mes.length;i++)
  -                     out.writeObject(mes[i]);
  -             waitAnswer();
  -     }       
        
        public void deleteTemporaryDestination(SpyDestination dest) throws Exception
        {
                checkSocket();
  -             out.writeByte(DELETE_TEMPORARY_DESTINATION);
  +             out.writeByte(m_deleteTemporaryDestination);
                out.writeObject(dest);
                waitAnswer();
        }       
  @@ -125,7 +107,7 @@
        public void close() throws Exception
        {
                checkSocket();
  -             out.writeByte(CLOSE);
  +             out.writeByte(m_close);
                waitAnswer();
        }
        
  @@ -134,4 +116,11 @@
                if (socket==null) createConnection();
        }
   
  +     public void receive(ReceiveRequest messages[]) throws Exception
  +     {               
  +             checkSocket();
  +             out.writeByte(m_receive);
  +             out.writeObject(messages);
  +             waitAnswer();
  +     }
   }
  
  
  
  1.5       +10 -7     
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMI.java
  
  Index: ConnectionReceiverRMI.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMI.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- ConnectionReceiverRMI.java        2000/12/12 05:58:52     1.4
  +++ ConnectionReceiverRMI.java        2000/12/23 15:48:20     1.5
  @@ -9,29 +9,32 @@
   import javax.jms.Destination;
   import javax.jms.JMSException;
   
  -import org.spydermq.SpyMessage;
  +
   import org.spydermq.SpyDestination;
   import org.spydermq.distributed.interfaces.ConnectionReceiver;
   
   import java.rmi.Remote;
   import java.rmi.RemoteException;
   
  +import org.spydermq.ReceiveRequest;
  +
   /**
    *   The RMI interface of the ConnectionReceiver object
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.4 $
  + *   @version $Revision: 1.5 $
    */
   public interface ConnectionReceiverRMI 
        extends ConnectionReceiver, Remote
   {
  -     // Public --------------------------------------------------------
   
  -     //A message has arrived for the Connection 
  -     public void receive(SpyDestination b,SpyMessage c) throws RemoteException, 
JMSException;
  -     //Messages have arrived for the Connection 
  -     public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws 
RemoteException, JMSException;
  +
        //One TemporaryDestination has been deleted
        public void deleteTemporaryDestination(SpyDestination dest) throws 
RemoteException, JMSException;
  +
  +     // Public --------------------------------------------------------
  +
  +     //A message has arrived for the Connection 
  +     public void receive(ReceiveRequest messages[]) throws RemoteException, 
Exception;       
   }
  
  
  
  1.15      +11 -77    
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMIImpl.java
  
  Index: ConnectionReceiverRMIImpl.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMIImpl.java,v
  retrieving revision 1.14
  retrieving revision 1.15
  diff -u -r1.14 -r1.15
  --- ConnectionReceiverRMIImpl.java    2000/12/21 22:33:58     1.14
  +++ ConnectionReceiverRMIImpl.java    2000/12/23 15:48:20     1.15
  @@ -10,31 +10,22 @@
   import javax.jms.JMSException;
   
   import org.spydermq.SpyConnection;
  -import org.spydermq.SpyMessage;
  -import org.spydermq.SpySession;
   import org.spydermq.SpyDestination;
  -import org.spydermq.SpyTopicConnection;
  -import org.spydermq.SpyQueueSession;
   import org.spydermq.Log;
  -import org.spydermq.NoReceiverException;
   import org.spydermq.distributed.interfaces.ConnectionReceiver;
   import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
  -import org.spydermq.SpyConsumer;
  +import org.spydermq.ReceiveRequest;
   
   import java.rmi.RemoteException; 
   import java.rmi.server.UnicastRemoteObject;
  -import java.util.Hashtable;
  -import java.util.HashSet;
  -import java.util.Iterator;
   
  -
   /**
    *   The RMI implementation of the ConnectionReceiver object
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.14 $
  + *   @version $Revision: 1.15 $
    */
   public class ConnectionReceiverRMIImpl 
        extends UnicastRemoteObject 
  @@ -62,73 +53,6 @@
                this.connection=connection;
        }
        
  -     //A message has arrived for this Connection, We have to dispatch it to the 
sessions
  -     public void receive(SpyDestination dest, SpyMessage mes) throws JMSException {
  -             if (closed)
  -                     throw new IllegalStateException("The connection is closed");
  -
  -             Log.log("ConnectionReceiver: Receive(Destination=" + dest.toString() + 
",Mes=" + mes.toString() + ")");
  -
  -             if (connection instanceof SpyTopicConnection) {
  -
  -                     //Get the set of subscribers for this Topic
  -                     SpyConsumer consumers[] = connection.getConsumers(dest);
  -
  -                     for (int i = 0; i < consumers.length; i++) {
  -
  -                             //add the new message to the consumer's queue
  -                             consumers[i].addMessage(mes);
  -
  -                             //There is work to do... 
  -                             consumers[i].processMessages();
  -                     }
  -
  -             } else {
  -
  -                     //Find one session waiting for this Queue
  -                     if (connection.modeStop)
  -                             throw new JMSException("This connection is stopped !");
  -
  -                     SpyConsumer consumer = connection.pickListeningConsumer(dest);
  -                     if (consumer == null)
  -                             throw new NoReceiverException("There are no listening 
sessions for this destination !");
  -
  -                     //Try with this sessionQueue
  -                     Log.log("Dispatching to SessionQueue: " + mes);
  -                     consumer.addMessage(mes);
  -                     consumer.processMessages();
  -
  -             }
  -     } 
  -
  -     public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws 
JMSException
  -     {
  -             if (closed)
  -                     throw new IllegalStateException("The connection is closed");
  -
  -             Log.log("ConnectionReceiver: ReceiveMultiple()");
  -
  -             if (connection instanceof SpyTopicConnection) {
  -
  -                     //Get the set of subscribers for this Topic
  -                     SpyConsumer consumers[] = connection.getConsumers(dest);
  -
  -                     for(int i=0;i<mes.length;i++) {
  -
  -                             for (int j = 0; j < consumers.length; j++) {
  -
  -                                     //add the new message to the consumer's queue
  -                                     consumers[j].addMessage(mes[i]);
  -
  -                                     //There is work to do... 
  -                                     consumers[j].processMessages();
  -                             }
  -                     }
  -             } else {
  -                     throw new JMSException("Multiple dispatch for a Queue");
  -             }               
  -     }
  -     
        public void close() throws Exception
        {
                closed=true;            
  @@ -147,4 +71,14 @@
                return this;
        }
        
  +     public void receive(ReceiveRequest messages[]) throws Exception {
  +             
  +             if (closed)
  +                     throw new IllegalStateException("The connection is closed");
  +
  +             Log.log("ConnectionReceiver: Receive(ReceiveRequest[" + 
messages.length +"])");
  +
  +             connection.deliver(messages);
  +
  +     }
   }
  
  
  
  1.6       +8 -92     
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverUIL.java
  
  Index: ConnectionReceiverUIL.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverUIL.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- ConnectionReceiverUIL.java        2000/12/21 22:33:58     1.5
  +++ ConnectionReceiverUIL.java        2000/12/23 15:48:21     1.6
  @@ -42,10 +42,13 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.5 $
  + *   @version $Revision: 1.6 $
    */
   public class ConnectionReceiverUIL implements Runnable, ConnectionReceiverSetup {
        // Attributes ----------------------------------------------------
  +     static final int m_close = 2;
  +     static final int m_deleteTemporaryDestination = 1;
  +     static final int m_receive = 3;
   
        //A link on my connection
        private SpyConnection connection;
  @@ -58,13 +61,6 @@
                closed = false;
        }
   
  -     // Internals -----------------------------------------------------
  -
  -     static final int RECEIVE = 1;
  -     static final int RECEIVE_MULTIPLE = 2;
  -     static final int DELETE_TEMPORARY_DESTINATION = 3;
  -     static final int CLOSE = 4;
  -
        void exportObject() {
                Thread thread = new Thread(this, "ConnectionReceiverUIL");
                thread.setDaemon(true);
  @@ -106,18 +102,13 @@
                        try {
   
                                switch (code) {
  -                                     case RECEIVE :
  -                                             receive((SpyDestination) 
in.readObject(), (SpyMessage) in.readObject());
  +                                     case m_receive:
  +                                             
connection.deliver((org.spydermq.ReceiveRequest[])in.readObject());
                                                break;
  -                                     case RECEIVE_MULTIPLE :
  -                                             SpyDestination dest = (SpyDestination) 
in.readObject();
  -                                             int nb = in.readInt();
  -                                             receiveMultiple(dest, nb, in);
  +                                     case m_deleteTemporaryDestination:
  +                                             
connection.deleteTemporaryDestination((SpyDestination) in.readObject());
                                                break;
  -                                     case DELETE_TEMPORARY_DESTINATION :
  -                                             
deleteTemporaryDestination((SpyDestination) in.readObject());
  -                                             break;
  -                                     case CLOSE :
  +                                     case m_close :
                                                close();
                                                break;
                                        default :
  @@ -191,83 +182,8 @@
                return new ConnectionReceiverUILClient();
        }
   
  -     //A message has arrived for this Connection, We have to dispatch it to the 
sessions
  -     public void receive(SpyDestination dest, SpyMessage mes) throws JMSException {
  -             if (closed)
  -                     throw new IllegalStateException("The connection is closed");
  -
  -             Log.log("ConnectionReceiver: Receive(Destination=" + dest.toString() + 
",Mes=" + mes.toString() + ")");
  -
  -             if (connection instanceof SpyTopicConnection) {
  -
  -                     //Get the set of subscribers for this Topic
  -                     SpyConsumer consumers[] = connection.getConsumers(dest);
  -
  -                     for (int i = 0; i < consumers.length; i++) {
  -
  -                             //add the new message to the consumer's queue
  -                             consumers[i].addMessage(mes);
  -
  -                             //There is work to do... 
  -                             consumers[i].processMessages();
  -                     }
  -
  -             } else {
  -
  -                     //Find one session waiting for this Queue
  -                     if (connection.modeStop)
  -                             throw new JMSException("This connection is stopped !");
  -
  -                     SpyConsumer consumer = connection.pickListeningConsumer(dest);
  -                     if (consumer == null)
  -                             throw new NoReceiverException("There are no listening 
sessions for this destination !");
  -
  -                     //Try with this sessionQueue
  -                     Log.log("Dispatching to SessionQueue: " + mes);
  -                     consumer.addMessage(mes);
  -                     consumer.processMessages();
  -
  -             }
  -     }
  -
  -     public void receiveMultiple(SpyDestination dest, int nb, ObjectInputStream in) 
throws Exception {
  -             if (closed)
  -                     throw new IllegalStateException("The connection is closed");
  -
  -             Log.log("ConnectionReceiver: ReceiveMultiple()");
  -
  -             if (connection instanceof SpyTopicConnection) {
  -
  -                     //Get the set of subscribers for this Topic
  -                     SpyConsumer consumers[] = connection.getConsumers(dest);
  -
  -                     for (int val = 0; val < nb; val++) {
  -                             SpyMessage mes = (SpyMessage) in.readObject();
  -
  -                             for (int i = 0; i < consumers.length; i++) {
  -
  -                                     //add the new message to the consumer's queue
  -                                     consumers[i].addMessage(mes);
  -
  -                                     //There is work to do... 
  -                                     consumers[i].processMessages();
  -                             }
  -                     }
  -             } else {
  -                     throw new Exception("Multiple dispatch for a Queue");
  -             }
  -     }
  -
        public void close() throws Exception {
                closed = true;
  -     }
  -
  -     //One TemporaryDestination has been deleted
  -     public void deleteTemporaryDestination(SpyDestination dest) throws 
JMSException {
  -             if (closed)
  -                     throw new IllegalStateException("The connection is closed");
  -
  -             connection.deleteTemporaryDestination(dest);
        }
   
   }
  
  
  
  1.5       +24 -31    
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverUILClient.java
  
  Index: ConnectionReceiverUILClient.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverUILClient.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- ConnectionReceiverUILClient.java  2000/12/12 05:58:51     1.4
  +++ ConnectionReceiverUILClient.java  2000/12/23 15:48:21     1.5
  @@ -31,19 +31,20 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.4 $
  + *   @version $Revision: 1.5 $
    */
  -import org.spydermq.multiplexor.SocketMultiplexor;public class 
ConnectionReceiverUILClient implements ConnectionReceiver, Serializable
  +import org.spydermq.multiplexor.SocketMultiplexor;
  +import org.spydermq.ReceiveRequest;
  +
  +public class ConnectionReceiverUILClient implements ConnectionReceiver, Serializable
   {
  -     static final int RECEIVE = 1;
  -     static final int RECEIVE_MULTIPLE = 2;
  -     static final int DELETE_TEMPORARY_DESTINATION = 3;
  -     static final int CLOSE = 4;             
  -     
        private transient ObjectOutputStream out;
        private transient ObjectInputStream in;
        transient SocketMultiplexor mSocket;
        
  +     static final int m_close = 2;
  +     static final int m_deleteTemporaryDestination = 1;
  +     static final int m_receive = 3;
                
        void createConnection() throws RemoteException
        {
  @@ -81,40 +82,20 @@
                
                if( throwException != null )
                        throw throwException;
  -     }
  -
  -     public void receive(SpyDestination dest,SpyMessage mes) throws Exception
  -     {               
  -             if (out==null) createConnection();
  -             out.writeByte(RECEIVE);
  -             out.writeObject(dest);
  -             out.writeObject(mes);
  -             waitAnswer();
  -     }
  -             
  -     public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws 
Exception      
  -     {
  -             if (out==null) createConnection();
  -             out.writeByte(RECEIVE_MULTIPLE);
  -             out.writeObject(dest);
  -             out.writeInt(mes.length);
  -             for(int i=0;i<mes.length;i++)
  -                     out.writeObject(mes[i]);
  -             waitAnswer();
        }       
        
        public void deleteTemporaryDestination(SpyDestination dest) throws Exception
        {
  -             if (out==null) createConnection();
  -             out.writeByte(DELETE_TEMPORARY_DESTINATION);
  +             checkSocket();
  +             out.writeByte(m_deleteTemporaryDestination);
                out.writeObject(dest);
                waitAnswer();
        }       
        
        public void close() throws Exception
        {
  -             if (out==null) createConnection();
  -             out.writeByte(CLOSE);
  +             checkSocket();
  +             out.writeByte(m_close);
                waitAnswer();
        }
        
  @@ -122,4 +103,16 @@
        {
        }
        
  +     protected void checkSocket() throws Exception {
  +             if (out == null)
  +                     createConnection();
  +     }
  +
  +     public void receive(ReceiveRequest messages[]) throws Exception
  +     {               
  +             checkSocket();
  +             out.writeByte(m_receive);
  +             out.writeObject(messages);
  +             waitAnswer();
  +     }
   }
  
  
  
  1.9       +47 -38    
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOIL.java
  
  Index: DistributedJMSServerOIL.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOIL.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- DistributedJMSServerOIL.java      2000/12/19 06:43:37     1.8
  +++ DistributedJMSServerOIL.java      2000/12/23 15:48:21     1.9
  @@ -17,10 +17,11 @@
   import org.spydermq.SpyDestination;
   import org.spydermq.SpyDistributedConnection;
   import org.spydermq.Log;
  -import org.spydermq.SpyAcknowledgementItem;
   import org.spydermq.distributed.interfaces.DistributedJMSServer;
   import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
  -
  +import org.spydermq.TransactionRequest;
  +import org.spydermq.AcknowledgementRequest;
  +import org.spydermq.Subscription;
   
   import java.rmi.RemoteException; 
   import java.net.ServerSocket;
  @@ -32,15 +33,13 @@
   import java.io.BufferedInputStream;
   import java.io.IOException;
   
  -import org.spydermq.TransactionRequest;
  -
   /**
    *   The OIL implementation of the DistributedJMSServer object
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.8 $
  + *   @version $Revision: 1.9 $
    */
   public class DistributedJMSServerOIL extends DistributedJMSServerUIL
        implements DistributedJMSServerOILMBean
  @@ -89,55 +88,61 @@
                                
                                switch (code)
                                {
  -                                     case GetID: 
  -                                             result=server.getID();
  +                                     case m_setSpyDistributedConnection: 
  +                                             spyDistributedConnection = 
(SpyDistributedConnection)in.readObject();
                                                break;
  -                                     case NewMessage:                               
                 
  +                                     case m_acknowledge:
  +                                             
server.acknowledge(spyDistributedConnection, (AcknowledgementRequest)in.readObject());
  +                                             break;
  +                                     case m_addMessage:                             
                 
                                                
server.addMessage(spyDistributedConnection, (SpyMessage)in.readObject());
                                                break;
  -                                     case Subscribe:
  -                                             
server.subscribe(spyDistributedConnection, (Destination)in.readObject());
  +                                     case m_browse:                                 
         
  +                                             
result=server.browse(spyDistributedConnection, (Destination)in.readObject(), 
(String)in.readObject());
                                                break;
  -                                     case Unsubscribe: 
  -                                             
server.unsubscribe(spyDistributedConnection,(Destination)in.readObject());
  +                                     case m_checkID: 
  +                                             
server.checkID((String)in.readObject());
                                                break;
  -                                     case CreateTopic: 
  -                                             
result=(Topic)server.createTopic((String)in.readObject());
  +                                     case m_connectionClosing: 
  +                                             
server.connectionClosing(spyDistributedConnection);
  +                                             closed = true;
                                                break;
  -                                     case CreateQueue: 
  -                                             
result=(Queue)server.createQueue((String)in.readObject());
  +                                     case this.m_createQueue: 
  +                                             
result=(Queue)server.createQueue(spyDistributedConnection, (String)in.readObject());
                                                break;
  -                                     case GetTemporaryTopic:
  -                                             
result=(TemporaryTopic)server.getTemporaryTopic(spyDistributedConnection);
  +                                     case m_createTopic: 
  +                                             
result=(Topic)server.createTopic(spyDistributedConnection, (String)in.readObject());
                                                break;
  -                                     case GetTemporaryQueue: 
  -                                             
result=(TemporaryQueue)server.getTemporaryQueue(spyDistributedConnection);
  +                                     case m_deleteTemporaryDestination: 
  +                                             
server.deleteTemporaryDestination(spyDistributedConnection, 
(SpyDestination)in.readObject());
                                                break;
  -                                     case ConnectionClosing: 
  -                                             
server.connectionClosing(spyDistributedConnection,null);
  -                                             closed = true;
  +                                     case this.m_getID: 
  +                                             result=server.getID();
                                                break;
  -                                     case DeleteTemporaryDestination: 
  -                                             
server.deleteTemporaryDestination((SpyDestination)in.readObject());
  +                                     case m_getTemporaryQueue: 
  +                                             
result=(TemporaryQueue)server.getTemporaryQueue(spyDistributedConnection);
                                                break;
  -                                     case CheckID: 
  -                                             
server.checkID((String)in.readObject());
  +                                     case m_getTemporaryTopic:
  +                                             
result=(TemporaryTopic)server.getTemporaryTopic(spyDistributedConnection);
                                                break;
  -                                     case QueueReceive:
  -                                             
result=server.queueReceive(spyDistributedConnection,(Queue)in.readObject(), 
in.readLong());
  +                                     case m_listenerChange: 
  +                                             
server.listenerChange(spyDistributedConnection,in.readInt(),in.readBoolean());
                                                break;
  -                                     case ConnectionListening: 
  -                                             
server.connectionListening(spyDistributedConnection,in.readBoolean(),(Destination)in.readObject());
  +                                     case m_receive:
  +                                             
result=server.receive(spyDistributedConnection,in.readInt(), in.readLong());
                                                break;
  -                                     case Acknowledge:
  -                                             
server.acknowledge(spyDistributedConnection, (SpyAcknowledgementItem)in.readObject());
  +                                     case m_setEnabled :
  +                                             
server.setEnabled(spyDistributedConnection, in.readBoolean());
                                                break;
  -                                     case SetSpyDistributedConnection: 
  -                                             spyDistributedConnection = 
(SpyDistributedConnection)in.readObject();
  +                                     case m_subscribe:
  +                                             
server.subscribe(spyDistributedConnection, (Subscription)in.readObject());
                                                break;
  -                                     case Transact: 
  +                                     case m_transact:
                                                
server.transact(spyDistributedConnection, (TransactionRequest)in.readObject());
                                                break;
  +                                     case m_unsubscribe: 
  +                                             
server.unsubscribe(spyDistributedConnection,in.readInt());
  +                                             break;
                                        default:
                                                throw new RemoteException("Bad method 
code !");
                                }
  @@ -175,10 +180,14 @@
                                }                               
                        }
                }
  -             
  +                             
                try {
  -                     if( !closed )
  -                             
server.connectionClosing(spyDistributedConnection,null);
  +                     if( !closed ) {
  +                             try {
  +                                     
server.connectionClosing(spyDistributedConnection);
  +                             } catch (JMSException e) {
  +                             }
  +                     }
                        socket.close();
                } catch (IOException e ) {
                        Log.log("Could not gracefully close the connection with the 
client");
  
  
  
  1.6       +3 -2      
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOILClient.java
  
  Index: DistributedJMSServerOILClient.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOILClient.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- DistributedJMSServerOILClient.java        2000/12/12 05:58:50     1.5
  +++ DistributedJMSServerOILClient.java        2000/12/23 15:48:21     1.6
  @@ -18,8 +18,8 @@
   import org.spydermq.Log;
   import org.spydermq.SpyDistributedConnection;
   import org.spydermq.distributed.interfaces.DistributedJMSServer;
  -import org.spydermq.SpyAcknowledgementItem;
   import org.spydermq.server.JMSServer;
  +import org.spydermq.AcknowledgementRequest;
   
   import java.rmi.RemoteException;
   import java.io.ObjectOutputStream;
  @@ -31,13 +31,14 @@
   import java.net.Socket;
   import java.net.InetAddress;
   
  +
   /**
    *The OIL implementation of the DistributedJMSServer object
    *      
    *@author Norbert Lataille ([EMAIL PROTECTED])
    *@author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *@version $Revision: 1.5 $
  + *@version $Revision: 1.6 $
    */
   public class DistributedJMSServerOILClient   extends DistributedJMSServerUILClient
        implements DistributedJMSServer, Serializable {
  
  
  
  1.5       +9 -6      
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMI.java
  
  Index: DistributedJMSServerRMI.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMI.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- DistributedJMSServerRMI.java      2000/12/19 06:43:37     1.4
  +++ DistributedJMSServerRMI.java      2000/12/23 15:48:21     1.5
  @@ -20,7 +20,7 @@
   import org.spydermq.SpyDestination;
   import org.spydermq.SpyDistributedConnection;
   import org.spydermq.distributed.interfaces.DistributedJMSServer;
  -import org.spydermq.SpyAcknowledgementItem;
  +import org.spydermq.AcknowledgementRequest;
   
   /**
    *   The RMI interface of the DistributedJMSServer object
  @@ -28,9 +28,10 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.4 $
  + *   @version $Revision: 1.5 $
    */
   public interface DistributedJMSServerRMI extends DistributedJMSServer, Remote {
  +
        // Public --------------------------------------------------------
        public String getID() throws RemoteException, Exception;
        public void checkID(String ID) throws RemoteException, Exception;
  @@ -38,14 +39,16 @@
        public void connectionClosing(SpyDistributedConnection dc) throws 
RemoteException, Exception;
        public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws 
RemoteException, Exception;
        public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws 
RemoteException, Exception;
  -     public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem 
item) throws RemoteException, Exception;
        public void addMessage(SpyDistributedConnection dc, SpyMessage message) throws 
RemoteException, Exception;
  -     public void connectionListening(SpyDistributedConnection dc, boolean mode, 
Destination dest) throws RemoteException, Exception;
        public Queue createQueue(SpyDistributedConnection dc, String dest) throws 
RemoteException, Exception;
        public Topic createTopic(SpyDistributedConnection dc, String dest) throws 
RemoteException, Exception;
        public void deleteTemporaryDestination(SpyDistributedConnection dc, 
SpyDestination dest) throws RemoteException, Exception;
  -     public SpyMessage queueReceive(SpyDistributedConnection dc, Queue queue, long 
wait) throws RemoteException, Exception;
  -     public void subscribe(SpyDistributedConnection dc, Destination dest) throws 
RemoteException, Exception;
  -     public void unsubscribe(SpyDistributedConnection dc, Destination dest) throws 
RemoteException, Exception;
        public void transact(SpyDistributedConnection dc, 
org.spydermq.TransactionRequest t) throws RemoteException, Exception;
  +     public void acknowledge(SpyDistributedConnection dc, AcknowledgementRequest 
item) throws RemoteException, Exception;
  +     public SpyMessage[] browse(SpyDistributedConnection dc, Destination dest, 
String selector) throws RemoteException, Exception;
  +     public void listenerChange(SpyDistributedConnection dc, int subscriberId, 
boolean state) throws RemoteException, Exception;
  +     public SpyMessage receive(SpyDistributedConnection dc, int subscriberId, long 
wait) throws RemoteException, Exception;
  +     public void setEnabled(SpyDistributedConnection dc, boolean enabled) throws 
RemoteException, Exception;
  +     public void subscribe(SpyDistributedConnection dc, org.spydermq.Subscription 
s) throws RemoteException, Exception;
  +     public void unsubscribe(SpyDistributedConnection dc, int subscriptionId ) 
throws RemoteException, Exception;
   }
  
  
  
  1.7       +33 -29    
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMIImpl.java
  
  Index: DistributedJMSServerRMIImpl.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMIImpl.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- DistributedJMSServerRMIImpl.java  2000/12/19 06:43:37     1.6
  +++ DistributedJMSServerRMIImpl.java  2000/12/23 15:48:21     1.7
  @@ -22,9 +22,9 @@
   import org.spydermq.Log;
   import org.spydermq.distributed.interfaces.DistributedJMSServer;
   import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
  -import org.spydermq.SpyAcknowledgementItem;
   import org.spydermq.server.JMSServer;
   import org.spydermq.TransactionRequest;
  +import org.spydermq.AcknowledgementRequest;
   
   /**
    *   The RMI implementation of the DistributedJMSServer object
  @@ -32,7 +32,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.6 $
  + *   @version $Revision: 1.7 $
    */
   public class DistributedJMSServerRMIImpl 
        extends UnicastRemoteObject 
  @@ -63,17 +63,17 @@
        
        public Topic createTopic(SpyDistributedConnection dc, String dest) throws 
JMSException
        {
  -             return server.createTopic(dest);
  +             return server.createTopic(dc,dest);
        }
        
        public Queue createQueue(SpyDistributedConnection dc, String dest) throws 
JMSException
        {
  -             return server.createQueue(dest);
  +             return server.createQueue(dc,dest);
        }
   
        public void deleteTemporaryDestination(SpyDistributedConnection dc, 
SpyDestination dest) throws JMSException
        {
  -             server.deleteTemporaryDestination(dest);
  +             server.deleteTemporaryDestination(dc,dest);
        }
   
        public void checkID(String ID) throws JMSException
  @@ -98,18 +98,9 @@
                // to access the same remote object via RMI
        }
        
  -     public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem 
item) throws JMSException, RemoteException{
  -             server.acknowledge(dc, item);
  -     }       
  -     
        public void connectionClosing(SpyDistributedConnection dc) throws JMSException
  -     {
  -             server.connectionClosing(dc,null);
  -     }       
  -     
  -     public void connectionListening(SpyDistributedConnection dc,boolean 
mode,Destination dest) throws JMSException
        {
  -             server.connectionListening(dc,mode,dest);
  +             server.connectionClosing(dc);
        }       
        
        public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws 
JMSException
  @@ -122,22 +113,35 @@
                return server.getTemporaryTopic(dc);
        }       
        
  -     public SpyMessage queueReceive(SpyDistributedConnection dc, Queue queue, long 
wait) throws JMSException
  -     {
  -             return server.queueReceive(dc,queue, wait);
  -     }       
  -     
  -     public void subscribe(SpyDistributedConnection dc, Destination dest) throws 
JMSException
  -     {
  -             server.subscribe(dc,dest);
  -     }       
  -     
  -     public void unsubscribe(SpyDistributedConnection dc, Destination dest) throws 
JMSException
  -     {
  -             server.unsubscribe(dc,dest);
  -     }
  -     
        public void transact(org.spydermq.SpyDistributedConnection dc, 
TransactionRequest t) throws JMSException {
                server.transact(dc,t);
  +     }
  +
  +     public void acknowledge(SpyDistributedConnection dc, AcknowledgementRequest 
item) throws Exception {
  +             server.acknowledge(dc, item);
  +     }
  +
  +     public SpyMessage[] browse(SpyDistributedConnection dc, Destination dest, 
String selector) throws Exception {
  +             return server.browse(dc, dest, selector);
  +     }
  +
  +     public void listenerChange(SpyDistributedConnection dc, int subscriberId, 
boolean state) throws Exception {
  +             server.listenerChange(dc,subscriberId,state);
  +     }
  +
  +     public SpyMessage receive(SpyDistributedConnection dc, int subscriberId, long 
wait) throws Exception {
  +             return server.receive(dc,subscriberId,wait);
  +     }
  +
  +     public void setEnabled(SpyDistributedConnection dc, boolean enabled) throws 
Exception {
  +             server.setEnabled( dc, enabled);        
  +     }
  +
  +     public void subscribe(SpyDistributedConnection dc, org.spydermq.Subscription 
s) throws Exception {
  +             server.subscribe(dc,s);
  +     }
  +
  +     public void unsubscribe(SpyDistributedConnection dc, int subscriptionId ) 
throws Exception{
  +             server.unsubscribe(dc, subscriptionId);
        }
   }
  
  
  
  1.7       +70 -65    
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUIL.java
  
  Index: DistributedJMSServerUIL.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUIL.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- DistributedJMSServerUIL.java      2000/12/19 06:43:37     1.6
  +++ DistributedJMSServerUIL.java      2000/12/23 15:48:21     1.7
  @@ -17,12 +17,13 @@
   import org.spydermq.SpyDestination;
   import org.spydermq.SpyDistributedConnection;
   import org.spydermq.Log;
  -import org.spydermq.SpyAcknowledgementItem;
   import org.spydermq.server.JMSServer;
  -
   import org.spydermq.distributed.interfaces.DistributedJMSServer;
   import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
   import org.spydermq.multiplexor.SocketMultiplexor;
  +import org.spydermq.TransactionRequest;
  +import org.spydermq.AcknowledgementRequest;
  +import org.spydermq.Subscription;
   
   import java.rmi.RemoteException; 
   import java.net.ServerSocket;
  @@ -34,24 +35,40 @@
   import java.io.BufferedInputStream;
   import java.io.IOException;
   
  -import org.spydermq.TransactionRequest;
  -
   /**
    *   The UIL implementation of the DistributedJMSServer object
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.6 $
  + *   @version $Revision: 1.7 $
    */
   public class DistributedJMSServerUIL
        implements Runnable, DistributedJMSServerSetup, DistributedJMSServerUILMBean
   {
  -
  -     // Attributes ----------------------------------------------------
  +     static final int m_acknowledge = 1;
  +     static final int m_addMessage = 2;
  +     static final int m_browse = 3;
  +     static final int m_checkID = 4;
  +     static final int m_connectionClosing = 5;
  +     static final int m_createQueue = 6;
  +     static final int m_createTopic = 7;
  +     static final int m_deleteTemporaryDestination = 8;
  +     static final int m_getID = 9;
  +     static final int m_getTemporaryQueue = 10;
  +     static final int m_getTemporaryTopic = 11;
  +     static final int m_listenerChange = 12;
  +     static final int m_receive = 13;
  +     static final int m_setEnabled = 14;
  +     static final int m_setSpyDistributedConnection = 15;
  +     static final int m_subscribe = 16;
  +     static final int m_transact = 17;
  +     static final int m_unsubscribe = 18;
   
        //The server implementation
        protected static JMSServer server;
  +
  +     protected ServerSocket serverSocket;
        
        // Constructor ---------------------------------------------------
        
  @@ -60,27 +77,6 @@
                exportObject();
        }
   
  -     // Internals -----------------------------------------------------
  -
  -     static final int GetID = 1;
  -     static final int NewMessage = 2;
  -     static final int Subscribe = 3;
  -     static final int Unsubscribe = 4;
  -     static final int CreateTopic = 5;
  -     static final int CreateQueue = 6;
  -     static final int GetTemporaryTopic = 7;
  -     static final int GetTemporaryQueue = 8;
  -     static final int ConnectionClosing = 9;
  -     static final int DeleteTemporaryDestination = 10;
  -     static final int CheckID = 11;
  -     static final int QueueReceive = 12;     
  -     static final int ConnectionListening = 13;
  -     static final int Acknowledge = 14;      
  -     static final int SetSpyDistributedConnection = 15;
  -     static final int Transact = 16;
  -     
  -     protected ServerSocket serverSocket;
  -
        void exportObject()     
        {
                try {
  @@ -134,59 +130,65 @@
                                
                                switch (code)
                                {
  -                                     case GetID: 
  -                                             result=server.getID();
  +                                     case m_setSpyDistributedConnection: 
  +                                             spyDistributedConnection = 
(SpyDistributedConnection)in.readObject();
  +                                             if( spyDistributedConnection.cr 
instanceof ConnectionReceiverUILClient ) {
  +                                                     
((ConnectionReceiverUILClient)spyDistributedConnection.cr).mSocket = mSocket;
  +                                                     
((ConnectionReceiverUILClient)spyDistributedConnection.cr).createConnection();
  +                                             }
                                                break;
  -                                     case NewMessage:                               
                 
  +                                     case m_acknowledge:
  +                                             
server.acknowledge(spyDistributedConnection, (AcknowledgementRequest)in.readObject());
  +                                             break;
  +                                     case m_addMessage:                             
                 
                                                
server.addMessage(spyDistributedConnection, (SpyMessage)in.readObject());
                                                break;
  -                                     case Subscribe:
  -                                             
server.subscribe(spyDistributedConnection, (Destination)in.readObject());
  +                                     case m_browse:                                 
         
  +                                             
result=server.browse(spyDistributedConnection, (Destination)in.readObject(), 
(String)in.readObject());
                                                break;
  -                                     case Unsubscribe: 
  -                                             
server.unsubscribe(spyDistributedConnection,(Destination)in.readObject());
  +                                     case m_checkID: 
  +                                             
server.checkID((String)in.readObject());
                                                break;
  -                                     case CreateTopic: 
  -                                             
result=(Topic)server.createTopic((String)in.readObject());
  +                                     case m_connectionClosing: 
  +                                             
server.connectionClosing(spyDistributedConnection);
  +                                             closed = true;
                                                break;
  -                                     case CreateQueue: 
  -                                             
result=(Queue)server.createQueue((String)in.readObject());
  +                                     case this.m_createQueue: 
  +                                             
result=(Queue)server.createQueue(spyDistributedConnection, (String)in.readObject());
                                                break;
  -                                     case GetTemporaryTopic:
  -                                             
result=(TemporaryTopic)server.getTemporaryTopic(spyDistributedConnection);
  +                                     case m_createTopic: 
  +                                             
result=(Topic)server.createTopic(spyDistributedConnection, (String)in.readObject());
                                                break;
  -                                     case GetTemporaryQueue: 
  -                                             
result=(TemporaryQueue)server.getTemporaryQueue(spyDistributedConnection);
  +                                     case m_deleteTemporaryDestination: 
  +                                             
server.deleteTemporaryDestination(spyDistributedConnection, 
(SpyDestination)in.readObject());
                                                break;
  -                                     case ConnectionClosing: 
  -                                             
server.connectionClosing(spyDistributedConnection,null);
  -                                             closed = true;
  +                                     case this.m_getID: 
  +                                             result=server.getID();
                                                break;
  -                                     case DeleteTemporaryDestination: 
  -                                             
server.deleteTemporaryDestination((SpyDestination)in.readObject());
  +                                     case m_getTemporaryQueue: 
  +                                             
result=(TemporaryQueue)server.getTemporaryQueue(spyDistributedConnection);
                                                break;
  -                                     case CheckID: 
  -                                             
server.checkID((String)in.readObject());
  +                                     case m_getTemporaryTopic:
  +                                             
result=(TemporaryTopic)server.getTemporaryTopic(spyDistributedConnection);
                                                break;
  -                                     case QueueReceive:
  -                                             
result=server.queueReceive(spyDistributedConnection,(Queue)in.readObject(), 
in.readLong());
  +                                     case m_listenerChange: 
  +                                             
server.listenerChange(spyDistributedConnection,in.readInt(),in.readBoolean());
                                                break;
  -                                     case ConnectionListening: 
  -                                             
server.connectionListening(spyDistributedConnection,in.readBoolean(),(Destination)in.readObject());
  +                                     case m_receive:
  +                                             
result=server.receive(spyDistributedConnection,in.readInt(), in.readLong());
                                                break;
  -                                     case Acknowledge:
  -                                             
server.acknowledge(spyDistributedConnection, (SpyAcknowledgementItem)in.readObject());
  +                                     case m_setEnabled :
  +                                             
server.setEnabled(spyDistributedConnection, in.readBoolean());
                                                break;
  -                                     case SetSpyDistributedConnection: 
  -                                             spyDistributedConnection = 
(SpyDistributedConnection)in.readObject();
  -                                             if( spyDistributedConnection.cr 
instanceof ConnectionReceiverUILClient ) {
  -                                                     
((ConnectionReceiverUILClient)spyDistributedConnection.cr).mSocket = mSocket;
  -                                                     
((ConnectionReceiverUILClient)spyDistributedConnection.cr).createConnection();
  -                                             }
  +                                     case m_subscribe:
  +                                             
server.subscribe(spyDistributedConnection, (Subscription)in.readObject());
                                                break;
  -                                     case Transact:
  +                                     case m_transact:
                                                
server.transact(spyDistributedConnection, (TransactionRequest)in.readObject());
                                                break;
  +                                     case m_unsubscribe: 
  +                                             
server.unsubscribe(spyDistributedConnection,in.readInt());
  +                                             break;
                                        default:
                                                throw new RemoteException("Bad method 
code !");
                                }
  @@ -227,9 +229,12 @@
   
                try {
   
  -                     if( !closed )
  -                             
server.connectionClosing(spyDistributedConnection,null);
  -
  +                     if( !closed ) {
  +                             try {
  +                             server.connectionClosing(spyDistributedConnection);
  +                             } catch ( JMSException ignore ) {
  +                             }
  +                     }
                        mSocket.close();
                } catch ( IOException e ) {
                        Log.log("Could not gracefully close the connection to the 
server.");
  
  
  
  1.6       +68 -43    
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUILClient.java
  
  Index: DistributedJMSServerUILClient.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUILClient.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- DistributedJMSServerUILClient.java        2000/12/19 06:43:37     1.5
  +++ DistributedJMSServerUILClient.java        2000/12/23 15:48:21     1.6
  @@ -17,8 +17,8 @@
   import org.spydermq.SpyDestination;
   import org.spydermq.Log;
   import org.spydermq.SpyDistributedConnection;
  -import org.spydermq.SpyAcknowledgementItem;
   
  +
   import org.spydermq.server.JMSServer;
   import org.spydermq.distributed.interfaces.DistributedJMSServer;
   import org.spydermq.multiplexor.SocketMultiplexor;
  @@ -34,6 +34,7 @@
   import java.net.InetAddress;
   
   import org.spydermq.TransactionRequest;
  +import org.spydermq.AcknowledgementRequest;
   
   /**
    *The UIL implementation of the DistributedJMSServer object
  @@ -41,26 +42,28 @@
    *@author Norbert Lataille ([EMAIL PROTECTED])
    *@author Hiram Chirino ([EMAIL PROTECTED])
    *
  - *@version $Revision: 1.5 $
  + *@version $Revision: 1.6 $
    */
   public class DistributedJMSServerUILClient implements DistributedJMSServer, 
Serializable {
   
  -     static final int GetID = 1;
  -     static final int NewMessage = 2;
  -     static final int Subscribe = 3;
  -     static final int Unsubscribe = 4;
  -     static final int CreateTopic = 5;
  -     static final int CreateQueue = 6;
  -     static final int GetTemporaryTopic = 7;
  -     static final int GetTemporaryQueue = 8;
  -     static final int ConnectionClouting = 9;
  -     static final int DeleteTemporaryDestination = 10;
  -     static final int CheckID = 11;
  -     static final int QueueReceive = 12;
  -     static final int ConnectionListening = 13;
  -     static final int Acknowledge = 14;
  -     static final int SetSpyDistributedConnection = 15;
  -     static final int Transact = 16;
  +     static final int m_acknowledge = 1;
  +     static final int m_addMessage = 2;
  +     static final int m_browse = 3;
  +     static final int m_checkID = 4;
  +     static final int m_connectionClosing = 5;
  +     static final int m_createQueue = 6;
  +     static final int m_createTopic = 7;
  +     static final int m_deleteTemporaryDestination = 8;
  +     static final int m_getID = 9;
  +     static final int m_getTemporaryQueue = 10;
  +     static final int m_getTemporaryTopic = 11;
  +     static final int m_listenerChange = 12;
  +     static final int m_receive = 13;
  +     static final int m_setEnabled = 14;
  +     static final int m_setSpyDistributedConnection = 15;
  +     static final int m_subscribe = 16;
  +     static final int m_transact = 17;
  +     static final int m_unsubscribe = 18;
   
        //Remote stuff
        protected int port;
  @@ -117,7 +120,7 @@
        public String getID() throws Exception {
                checkConnection();
                try {
  -                     out.writeByte(GetID);
  +                     out.writeByte(m_getID);
                } catch (IOException e) {
                        failure(e);
                }
  @@ -127,7 +130,7 @@
        public void checkID(String ID) throws JMSException, RemoteException {
                checkConnection();
                try {
  -                     out.writeByte(CheckID);
  +                     out.writeByte(m_checkID);
                        out.writeObject(ID);
                } catch (IOException e) {
                        failure(e);
  @@ -138,7 +141,7 @@
        public void setSpyDistributedConnection(SpyDistributedConnection dest) throws 
RemoteException {
                checkConnection();
                try {
  -                     out.writeByte(SetSpyDistributedConnection);
  +                     out.writeByte(m_setSpyDistributedConnection);
                        out.writeObject(dest);
                } catch (IOException e) {
                        failure(e);
  @@ -149,7 +152,7 @@
        public void connectionClosing(SpyDistributedConnection dc) throws 
JMSException, RemoteException {
                checkConnection();
                try {
  -                     out.writeByte(ConnectionClouting);
  +                     out.writeByte(m_connectionClosing);
                } catch (IOException e) {
                        failure(e);
                }
  @@ -159,7 +162,7 @@
        public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws 
JMSException, RemoteException {
                checkConnection();
                try {
  -                     out.writeByte(GetTemporaryQueue);
  +                     out.writeByte(m_getTemporaryQueue);
                } catch (IOException e) {
                        failure(e);
                }
  @@ -169,17 +172,17 @@
        public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws 
JMSException, RemoteException {
                checkConnection();
                try {
  -                     out.writeByte(GetTemporaryTopic);
  +                     out.writeByte(m_getTemporaryTopic);
                } catch (IOException e) {
                        failure(e);
                }
                return (TemporaryTopic) waitAnswer();
        }
   
  -     public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem 
item) throws JMSException, RemoteException {
  +     public void acknowledge(SpyDistributedConnection dc, AcknowledgementRequest 
item) throws JMSException, RemoteException {
                checkConnection();
                try {
  -                     out.writeByte(Acknowledge);
  +                     out.writeByte(m_acknowledge);
                        out.writeObject(item);
                } catch (IOException e) {
                        failure(e);
  @@ -190,7 +193,7 @@
        public void addMessage(SpyDistributedConnection dc, SpyMessage val) throws 
JMSException, RemoteException {
                checkConnection();
                try {
  -                     out.writeByte(NewMessage);
  +                     out.writeByte(m_addMessage);
                        out.writeObject(val);
                } catch (IOException e) {
                        failure(e);
  @@ -203,12 +206,12 @@
                createConnection();
        }
   
  -     public void connectionListening(SpyDistributedConnection dc, boolean mode, 
Destination dest) throws Exception, RemoteException {
  +     public void listenerChange(SpyDistributedConnection dc, int subscriberId, 
boolean state) throws Exception, RemoteException {
                checkConnection();
                try {
  -                     out.writeByte(ConnectionListening);
  -                     out.writeBoolean(mode);
  -                     out.writeObject(dest);
  +                     out.writeByte(m_listenerChange);
  +                     out.writeInt(subscriberId);
  +                     out.writeBoolean(state);
                } catch (IOException e) {
                        failure(e);
                }
  @@ -218,7 +221,7 @@
        public Queue createQueue(SpyDistributedConnection dc, String dest) throws 
JMSException, RemoteException {
                checkConnection();
                try {
  -                     out.writeByte(CreateQueue);
  +                     out.writeByte(m_createQueue);
                        out.writeObject(dest);
                } catch (IOException e) {
                        failure(e);
  @@ -229,7 +232,7 @@
        public Topic createTopic(SpyDistributedConnection dc, String dest) throws 
JMSException, RemoteException {
                checkConnection();
                try {
  -                     out.writeByte(CreateTopic);
  +                     out.writeByte(m_createTopic);
                        out.writeObject(dest);
                } catch (IOException e) {
                        failure(e);
  @@ -240,7 +243,7 @@
        public void deleteTemporaryDestination(SpyDistributedConnection dc, 
SpyDestination dest) throws JMSException, RemoteException {
                checkConnection();
                try {
  -                     out.writeByte(DeleteTemporaryDestination);
  +                     out.writeByte(m_deleteTemporaryDestination);
                        out.writeObject(dest);
                } catch (IOException e) {
                        failure(e);
  @@ -248,11 +251,11 @@
                waitAnswer();
        }
   
  -     public SpyMessage queueReceive(SpyDistributedConnection dc, Queue queue, long 
wait) throws Exception, RemoteException {
  +     public SpyMessage receive(SpyDistributedConnection dc, int subscriberId, long 
wait) throws Exception, RemoteException {
                checkConnection();
                try {
  -                     out.writeByte(QueueReceive);
  -                     out.writeObject(queue);
  +                     out.writeByte(m_receive);
  +                     out.writeInt(subscriberId);
                        out.writeLong(wait);
                } catch (IOException e) {
                        failure(e);
  @@ -260,11 +263,11 @@
                return (SpyMessage) waitAnswer();
        }
   
  -     public void subscribe(SpyDistributedConnection dc, Destination dest) throws 
JMSException, RemoteException {
  +     public void subscribe(SpyDistributedConnection dc, org.spydermq.Subscription 
s) throws JMSException, RemoteException {
                checkConnection();
                try {
  -                     out.writeByte(Subscribe);
  -                     out.writeObject(dest);
  +                     out.writeByte(m_subscribe);
  +                     out.writeObject(s);
                } catch (IOException e) {
                        failure(e);
                }
  @@ -272,22 +275,44 @@
        }
   
   
  +     public void unsubscribe(SpyDistributedConnection dc, int subscriptionId ) 
throws JMSException, RemoteException {
  +             checkConnection();
  +             try {
  +                     out.writeByte(m_unsubscribe);
  +                     out.writeInt(subscriptionId);
  +             } catch (IOException e) {
  +                     failure(e);
  +             }
  +             waitAnswer();
  +     }
   
  -     public void unsubscribe(SpyDistributedConnection dc, Destination dest) throws 
JMSException, RemoteException {
  +     public void setEnabled(SpyDistributedConnection dc, boolean enabled) throws 
JMSException, RemoteException {
                checkConnection();
                try {
  -                     out.writeByte(Unsubscribe);
  -                     out.writeObject(dest);
  +                     out.writeByte(m_setEnabled);
  +                     out.writeBoolean(enabled);
                } catch (IOException e) {
                        failure(e);
                }
                waitAnswer();
        }
   
  +     public SpyMessage[] browse(SpyDistributedConnection dc, Destination dest, 
String selector) throws JMSException, RemoteException {
  +             checkConnection();
  +             try {
  +                     out.writeByte(m_browse);
  +                     out.writeObject(dest);
  +                     out.writeObject(selector);
  +             } catch (IOException e) {
  +                     failure(e);
  +             }
  +             return (SpyMessage[]) waitAnswer();
  +     }
  +
        public void transact(org.spydermq.SpyDistributedConnection dc, 
TransactionRequest t) throws JMSException, RemoteException {
                checkConnection();
                try {
  -                     out.writeByte(Transact);
  +                     out.writeByte(m_transact);
                        out.writeObject(t);
                } catch (IOException e) {
                        failure(e);
  
  
  

Reply via email to