User: norbert 
  Date: 00/06/18 21:23:15

  Modified:    src/java/org/spydermq/distributed/server
                        ConnectionReceiverOIL.java
                        ConnectionReceiverOILClient.java
                        ConnectionReceiverRMIImpl.java
                        DistributedJMSServerRMIImpl.java
  Added:       src/java/org/spydermq/distributed/server
                        DistributedJMSServerOIL.java
                        DistributedJMSServerOILClient.java
  Log:
  More work for the OIL
  
  Revision  Changes    Path
  1.4       +21 -6     
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java
  
  Index: ConnectionReceiverOIL.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- ConnectionReceiverOIL.java        2000/06/15 23:21:40     1.3
  +++ ConnectionReceiverOIL.java        2000/06/19 04:23:14     1.4
  @@ -38,10 +38,10 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.3 $
  + *   @version $Revision: 1.4 $
    */
   public class ConnectionReceiverOIL 
  -     implements ConnectionReceiver, Runnable
  +     implements Runnable
   {
        // Attributes ----------------------------------------------------
   
  @@ -59,7 +59,7 @@
        }
   
        // Internals -----------------------------------------------------
  -     // Should be hold in an "extention" and the ConnectionReceiver logic should be 
in a separated object
  +     // Should be hold in an "extension" and the ConnectionReceiver logic should be 
in a separated object
        // We could generate a dynamic proxy for that...
        
        static final int RECEIVE = 1;
  @@ -67,7 +67,7 @@
        static final int DELETE_TEMPORARY_DESTINATION = 3;
        static final int CLOSE = 4;             
        
  -     ServerSocket serverSocket;
  +     private ServerSocket serverSocket;
   
        void exportObject()     
        {
  @@ -166,8 +166,7 @@
        }
        
        public ConnectionReceiver createClient() throws Exception
  -     {
  -             
  +     {               
                return new 
ConnectionReceiverOILClient(InetAddress.getLocalHost(),serverSocket.getLocalPort());
        }
        
  @@ -178,6 +177,21 @@
                this.connection=connection;
        }
        
  +     //<DEBUG>
  +     
  +     /*public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
  +     {
  +             connection.rec++;
  +     }
  +     
  +     public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws 
JMSException
  +     {
  +             connection.rec++;
  +     }*/
  +     
  +     //</DEBUG>
  +     
  +     
        //A message has arrived for this Connection, We have to dispatch it to the 
sessions
       public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
        {               
  @@ -254,6 +268,7 @@
                        
                }
        } 
  +     
   
        public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws 
JMSException
        {
  
  
  
  1.5       +1 -4      
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java
  
  Index: ConnectionReceiverOILClient.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- ConnectionReceiverOILClient.java  2000/06/15 23:27:48     1.4
  +++ ConnectionReceiverOILClient.java  2000/06/19 04:23:14     1.5
  @@ -56,6 +56,7 @@
        public void waitAnswer() throws Exception
        {
                try {
  +                     os.flush();
                        int val=is.read();
                        if (val==1) {                           
                                String st=(String)in.readObject();
  @@ -76,7 +77,6 @@
                os.write(RECEIVE);
                out.writeObject(dest);
                out.writeObject(mes);
  -             os.flush();
                waitAnswer();
        }
                
  @@ -86,7 +86,6 @@
                os.write(RECEIVE_MULTIPLE);
                out.writeObject(dest);
                out.writeObject(mes);
  -             os.flush();
                waitAnswer();
        }       
        
  @@ -95,7 +94,6 @@
                if (socket==null) createConnection();
                os.write(DELETE_TEMPORARY_DESTINATION);
                out.writeObject(dest);
  -             os.flush();
                waitAnswer();
        }       
       
  @@ -103,7 +101,6 @@
        {
                if (socket==null) createConnection();
                os.write(CLOSE);
  -             os.flush();
                waitAnswer();
        }
        
  
  
  
  1.10      +16 -1     
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMIImpl.java
  
  Index: ConnectionReceiverRMIImpl.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMIImpl.java,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -r1.9 -r1.10
  --- ConnectionReceiverRMIImpl.java    2000/06/15 22:50:31     1.9
  +++ ConnectionReceiverRMIImpl.java    2000/06/19 04:23:14     1.10
  @@ -30,7 +30,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.9 $
  + *   @version $Revision: 1.10 $
    */
   public class ConnectionReceiverRMIImpl 
        extends UnicastRemoteObject 
  @@ -57,6 +57,21 @@
        {
                this.connection=connection;
        }
  +     
  +     //<DEBUG>
  +     
  +     /*public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
  +     {
  +             connection.rec++;       
  +     }
  +
  +     public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws 
JMSException
  +     {
  +             connection.rec++;
  +     }*/
  +             
  +     //</DEBUG>
  +
        
        //A message has arrived for this Connection, We have to dispatch it to the 
sessions
       public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
  
  
  
  1.3       +16 -10    
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMIImpl.java
  
  Index: DistributedJMSServerRMIImpl.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMIImpl.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- DistributedJMSServerRMIImpl.java  2000/06/14 23:21:00     1.2
  +++ DistributedJMSServerRMIImpl.java  2000/06/19 04:23:15     1.3
  @@ -19,15 +19,19 @@
   import org.spydermq.Log;
   import java.rmi.server.UnicastRemoteObject;
   import java.rmi.RemoteException;
  +import org.spydermq.distributed.interfaces.DistributedJMSServer;
  +import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
   
   /**
    *   The RMI implementation of the DistributedJMSServer object
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
  -public class DistributedJMSServerRMIImpl extends UnicastRemoteObject implements 
DistributedJMSServerRMI, DistributedJMSServerRMIImplMBean
  +public class DistributedJMSServerRMIImpl 
  +     extends UnicastRemoteObject 
  +     implements DistributedJMSServerRMI, DistributedJMSServerRMIImplMBean, 
DistributedJMSServerSetup
   { 
        // Attributes ----------------------------------------------------
   
  @@ -43,11 +47,6 @@
   
        // Public --------------------------------------------------------
   
  -     public void setServer(JMSServer s)
  -     {
  -             server=s;
  -     }
  -     
        public String getID() throws JMSException
        {
                return server.getID();
  @@ -112,10 +111,17 @@
        {
                server.connectionListening(mode,dest,dc);
        }
  +
  +     // --
  +     
  +     public DistributedJMSServer createClient()
  +     {
  +             return this;
  +     }       
   
  -     public void finalize()
  +     public void setServer(JMSServer s)
        {
  -             Log.log("Distributed.finalize()");
  +             server=s;
        }
  -
  +     
   }
  
  
  
  1.1                  
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOIL.java
  
  Index: DistributedJMSServerOIL.java
  ===================================================================
  package org.spydermq.distributed.server;
  
  import javax.jms.JMSException;
  import javax.jms.Destination;
  import javax.jms.Topic;
  import javax.jms.Queue;
  import javax.jms.TemporaryTopic;
  import javax.jms.TemporaryQueue;
  import org.spydermq.SpyMessage;
  import org.spydermq.SpyDestination;
  import org.spydermq.JMSServer;
  import org.spydermq.SpyDistributedConnection;
  import org.spydermq.Log;
  import org.spydermq.distributed.interfaces.DistributedJMSServer;
  import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
  import java.rmi.RemoteException; 
  import java.net.ServerSocket;
  import java.net.Socket;
  import java.net.InetAddress;
  import java.io.ObjectOutputStream;
  import java.io.BufferedOutputStream;
  import java.io.ObjectInputStream;
  import java.io.InputStream;
  import java.io.OutputStream;
  import java.io.IOException;
  
  public class DistributedJMSServerOIL
        implements Runnable, DistributedJMSServerSetup
  {
  
        // Attributes ----------------------------------------------------
  
        //The server implementation
        private JMSServer server;
        
        // Constructor ---------------------------------------------------
        
        public DistributedJMSServerOIL() 
        {
                exportObject();
        }
  
        // Internals -----------------------------------------------------
        
        static final int GetID = 1;
        static final int NewMessage = 2;
        static final int Subscribe = 3;
        static final int Unsubscribe = 4;
        static final int CreateTopic = 5;
        static final int CreateQueue = 6;
        static final int GetTemporaryTopic = 7;
        static final int GetTemporaryQueue = 8;
        static final int ConnectionClosing = 9;
        static final int DeleteTemporaryDestination = 10;
        static final int CheckID = 11;
        static final int QueueReceiveNoWait = 12;
        static final int ConnectionListening = 13;
        
        private ServerSocket serverSocket;
  
        void exportObject()     
        {
                try {
                         serverSocket = new ServerSocket(0);
                         new Thread(this).start();
                } catch (IOException e) {
                        failure("Initialization",e);
                }
        }
        
        public void run()
        {
                Socket socket = null;
                int code = 0;
                InputStream is=null;
                OutputStream os=null;
                ObjectOutputStream out=null;
                ObjectInputStream in=null;
                      
                try {
                        socket = serverSocket.accept();
                  
                        new Thread(this).start();
  
                        is = socket.getInputStream();
                        os = socket.getOutputStream();
                        out = new ObjectOutputStream(os); 
                        in = new ObjectInputStream(is);
  
                } catch (IOException e) {
                        failure("Initialisation",e);
                        return;
                }
  
                while (true) {
  
                        try {
                                code=is.read();         
                        } catch (IOException e) {
                                failure("Command read",e);
                                return;
                        }
                
                        try {
                                
                                Object result=null;                                    
 
                                
                                switch (code)
                                {
                                        case GetID: 
                                                result=server.getID();
                                                break;
                                        case NewMessage: 
                                                
server.newMessage((SpyMessage[])in.readObject(),(String)in.readObject());
                                                break;
                                        case Subscribe: 
                                                
server.subscribe((Destination)in.readObject(),(SpyDistributedConnection)in.readObject());
                                                break;
                                        case Unsubscribe: 
                                                
server.unsubscribe((Destination)in.readObject(),(SpyDistributedConnection)in.readObject());
                                                break;
                                        case CreateTopic: 
                                                
result=(Topic)server.createTopic((String)in.readObject());
                                                break;
                                        case CreateQueue: 
                                                
result=(Queue)server.createQueue((String)in.readObject());
                                                break;
                                        case GetTemporaryTopic:
                                                
result=(TemporaryTopic)server.getTemporaryTopic((SpyDistributedConnection)in.readObject());
                                                break;
                                        case GetTemporaryQueue: 
                                                
result=(TemporaryQueue)server.getTemporaryQueue((SpyDistributedConnection)in.readObject());
                                                break;
                                        case ConnectionClosing: 
                                                
server.connectionClosing((SpyDistributedConnection)in.readObject(),null);
                                                break;
                                        case DeleteTemporaryDestination: 
                                                
server.deleteTemporaryDestination((SpyDestination)in.readObject());
                                                break;
                                        case CheckID: 
                                                
server.checkID((String)in.readObject());
                                                break;
                                        case QueueReceiveNoWait:
                                                
result=server.queueReceiveNoWait((Queue)in.readObject());
                                                break;
                                        case ConnectionListening: 
                                                
server.connectionListening(((is.read())==1),(Destination)in.readObject(),(SpyDistributedConnection)in.readObject());
                                                break;
                                        default:
                                                throw new RemoteException("Bad method 
code !");
                                }
                                
                                //Everthing was OK
                                
                                try {
                                        if (result==null) os.write(0);
                                        else {
                                                os.write(1);
                                                out.writeObject(result);
                                        }
                                        os.flush();
                                } catch (IOException e) {
                                        failure("Result write",e);
                                        return;                                 
                                }
                                
                        } catch (Exception e) {
  
                                try {
                                        os.write(2);
                                        out.writeObject(e.getMessage());
                                        os.flush();
                                } catch (IOException e2) {
                                        failure("Result write",e2);
                                        return;                                 
                                }
                                
                        }
  
                }
        }
        
        void failure(String st,Exception e)
        {
                Log.error("Closing socket: "+st);
                Log.error(e);
        }
        
        // --
        
        public DistributedJMSServer createClient() throws Exception
        {               
                return new 
DistributedJMSServerOILClient(InetAddress.getLocalHost(),serverSocket.getLocalPort());
        }
        
        public void setServer(JMSServer s)
        {
                server=s;
        }
        
  }
  
  
  
  1.1                  
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOILClient.java
  
  Index: DistributedJMSServerOILClient.java
  ===================================================================
  package org.spydermq.distributed.server;
  
  import javax.jms.JMSException;
  import javax.jms.Destination;
  import javax.jms.Topic;
  import javax.jms.Queue;
  import javax.jms.TemporaryTopic;
  import javax.jms.TemporaryQueue;
  import org.spydermq.SpyMessage;
  import org.spydermq.SpyDestination;
  import org.spydermq.JMSServer;
  import org.spydermq.Log;
  import org.spydermq.SpyDistributedConnection;
  import java.rmi.RemoteException;
  import java.io.ObjectOutputStream;
  import java.io.ObjectInputStream;
  import java.io.InputStream;
  import java.io.OutputStream;
  import java.io.IOException;
  import java.io.Serializable;
  import java.net.Socket;
  import java.net.InetAddress;
  import org.spydermq.distributed.interfaces.DistributedJMSServer;
  
  public class DistributedJMSServerOILClient
        implements DistributedJMSServer, Serializable
  {
        
        static final int GetID = 1;
        static final int NewMessage = 2;
        static final int Subscribe = 3;
        static final int Unsubscribe = 4;
        static final int CreateTopic = 5;
        static final int CreateQueue = 6;
        static final int GetTemporaryTopic = 7;
        static final int GetTemporaryQueue = 8;
        static final int ConnectionClosing = 9;
        static final int DeleteTemporaryDestination = 10;
        static final int CheckID = 11;
        static final int QueueReceiveNoWait = 12;
        static final int ConnectionListening = 13;
        
        //Remote stuff
        
        private transient Socket socket;
        private transient InputStream is;
        private transient OutputStream os;
        private transient ObjectOutputStream out;
        private transient ObjectInputStream in;
        
        private int port;
        private InetAddress addr;
        
        public DistributedJMSServerOILClient(InetAddress addr,int port)
        {
                socket=null;
                this.port=port;
                this.addr=addr;
        }
        
        void createConnection() throws RemoteException
        {
                try {                   
                        socket=new Socket(addr,port);
                        is=socket.getInputStream();
                        os=socket.getOutputStream();
                        in=new ObjectInputStream(is);
                        out=new ObjectOutputStream(os);
                } catch (Exception e) {
                        failure(e);
                }
        }
        
        public Object waitAnswer() throws RemoteException
        {
                try {
                        os.flush();
                        int val=is.read();
                        if (val==0) return null;
                        if (val==1) {
                                return in.readObject();
                        } else {
                                String st=(String)in.readObject();
                                throw new RemoteException(st);  
                        }
                } catch (Exception e) {
                        failure(e);
                        return null;
                }
                
        }       
        
        void failure(Exception e) throws RemoteException
        {
                Log.error(e);
                throw new RemoteException("Cannot contact the remote object");
        }       
        
        //--- Remote Calls
        
        public String getID() throws Exception
        {
                if (socket==null) createConnection();
                
                try {
                        os.write(GetID);
                } catch (IOException e) {
                        failure(e);
                }
                
                return (String)waitAnswer();
        }
        
      public void newMessage(SpyMessage val[],String id) throws JMSException, 
RemoteException
        {
                if (socket==null) createConnection();
                
                try {
                        os.write(NewMessage);
                        out.writeObject(val);
                        out.writeObject(id);
                } catch (IOException e) {
                        failure(e);
                }
                
                waitAnswer();
        }
        
      public void subscribe(Destination dest,SpyDistributedConnection who) throws 
JMSException, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
                        os.write(Subscribe);
                        out.writeObject(dest);
                        out.writeObject(who);
                } catch (IOException e) {
                        failure(e);
                }
                
                waitAnswer();
        }
        
        public void unsubscribe(Destination dest,SpyDistributedConnection who) throws 
JMSException, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
                        os.write(Unsubscribe);
                        out.writeObject(dest);
                        out.writeObject(who);
                } catch (IOException e) {
                        failure(e);
                }
                
                waitAnswer();
        }
        
        public Topic createTopic(String dest) throws JMSException, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
                        os.write(CreateTopic);
                        out.writeObject(dest);
                } catch (IOException e) {
                        failure(e);
                }
                
                return (Topic)waitAnswer();
        }
        
        public Queue createQueue(String dest) throws JMSException, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
                        os.write(CreateQueue);
                        out.writeObject(dest);
                } catch (IOException e) {
                        failure(e);
                }
                
                return (Queue)waitAnswer();
        }
        
        public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws 
JMSException, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
                        os.write(GetTemporaryTopic);
                        out.writeObject(dc);
                } catch (IOException e) {
                        failure(e);
                }
                
                return (TemporaryTopic)waitAnswer();            
        }
        
        public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws 
JMSException, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
                        os.write(GetTemporaryQueue);
                        out.writeObject(dc);
                } catch (IOException e) {
                        failure(e);
                }
                
                return (TemporaryQueue)waitAnswer();            
        }
        
        public void connectionClosing(SpyDistributedConnection dc) throws 
JMSException, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
                        os.write(ConnectionClosing);
                        out.writeObject(dc);
                } catch (IOException e) {
                        failure(e);
                }
                
                waitAnswer();
        }
        
        public void deleteTemporaryDestination(SpyDestination dest) throws 
JMSException, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
                        os.write(DeleteTemporaryDestination);
                        out.writeObject(dest);
                } catch (IOException e) {
                        failure(e);
                }
                
                waitAnswer();
        }
        
        public void checkID(String ID) throws JMSException, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
                        os.write(CheckID);
                        out.writeObject(ID);
                } catch (IOException e) {
                        failure(e);
                }
                
                waitAnswer();
        }
        
        public SpyMessage queueReceiveNoWait(Queue queue) throws Exception, 
RemoteException
        {
                if (socket==null) createConnection();
                
                try {
                        os.write(QueueReceiveNoWait);
                        out.writeObject(queue);
                } catch (IOException e) {
                        failure(e);
                }
                
                return (SpyMessage)waitAnswer();
        }
        
        public void connectionListening(boolean mode,Destination 
dest,SpyDistributedConnection dc) throws Exception, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
                        os.write(ConnectionListening);
                        if (mode) os.write(1);
                        else os.write(0);                                              
  
                        out.writeObject(dest);
                        out.writeObject(dc);
                } catch (IOException e) {
                        failure(e);
                }
                
                waitAnswer();
        }
  
        //--
        
        public void setServer(JMSServer s) throws Exception
        {
                //Nothing !
        }
        
  }
  
  
  

Reply via email to