User: norbert 
  Date: 00/06/15 15:50:32

  Modified:    src/java/org/spydermq/distributed/server
                        ConnectionReceiverOIL.java
                        ConnectionReceiverOILClient.java
                        ConnectionReceiverRMIImpl.java
  Log:
  More work on the OIL
  
  Revision  Changes    Path
  1.2       +39 -18    
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java
  
  Index: ConnectionReceiverOIL.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- ConnectionReceiverOIL.java        2000/06/15 04:10:00     1.1
  +++ ConnectionReceiverOIL.java        2000/06/15 22:50:31     1.2
  @@ -25,7 +25,9 @@
   import java.rmi.RemoteException; 
   import java.net.ServerSocket;
   import java.net.Socket;
  +import java.net.InetAddress;
   import java.io.ObjectOutputStream;
  +import java.io.BufferedOutputStream;
   import java.io.ObjectInputStream;
   import java.io.InputStream;
   import java.io.OutputStream;
  @@ -36,9 +38,10 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
  -public class ConnectionReceiverOIL implements ConnectionReceiver, Runnable
  +public class ConnectionReceiverOIL 
  +     implements ConnectionReceiver, Runnable
   {
        // Attributes ----------------------------------------------------
   
  @@ -65,15 +68,14 @@
        static final int CLOSE = 4;             
        
        ServerSocket serverSocket;
  -     
  -     public void exportObject()      
  +
  +     void exportObject()     
        {
                try {
  -                     //Should be dynamic...
  -                      serverSocket = new ServerSocket(12345);
  +                      serverSocket = new ServerSocket(0);
                         new Thread(this).start();
                } catch (IOException e) {
  -                     Log.error("Bug");
  +                     failure("Initialization",e);
                }
        }
        
  @@ -88,16 +90,18 @@
                      
                try {
                        socket = serverSocket.accept();
  -                 
  -                     new Thread(this).start();
  -
  -                     is=socket.getInputStream();
  -                     os=socket.getOutputStream();
  -                     out = new ObjectOutputStream(os);
  +               
  +                     //We have our connection to the broker... there's no need to 
wait for another connection
  +                     //new Thread(this).start();
  +
  +                     is = socket.getInputStream();
  +                     os = socket.getOutputStream();
  +                     out = new ObjectOutputStream(os); 
                        in = new ObjectInputStream(is);
   
                } catch (IOException e) {
  -                     Log.error("Bug");
  +                     failure("Initialisation",e);
  +                     return;
                }
   
                while (true) {
  @@ -105,9 +109,10 @@
                        Log.log("Wait for command");
                
                        try {
  -                             code=in.read();         
  +                             code=is.read();         
                        } catch (IOException e) {
  -                             Log.error("Bug");
  +                             failure("Command read",e);
  +                             return;
                        }
                
                        try {
  @@ -137,8 +142,10 @@
                                
                                try {
                                        os.write(0);
  +                                     os.flush();
                                } catch (IOException e) {
  -                                     Log.error("Bug");
  +                                     failure("Result write",e);
  +                                     return;                                 
                                }
                                
                        } catch (Exception e) {
  @@ -147,13 +154,27 @@
   
                                try {
                                        os.write(1);
  +                                     os.flush();
                                } catch (IOException e2) {
  -                                     Log.error("Bug");
  +                                     failure("Result write",e2);
  +                                     return;                                 
                                }
                                
                        }
   
                }
  +     }
  +     
  +     void failure(String st,Exception e)
  +     {
  +             Log.error(st);
  +             Log.error(e);
  +     }
  +     
  +     public ConnectionReceiver createClient() throws Exception
  +     {
  +             
  +             return new 
ConnectionReceiverOILClient(InetAddress.getLocalHost(),serverSocket.getLocalPort());
        }
        
        // Public --------------------------------------------------------      
  
  
  
  1.2       +42 -14    
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java
  
  Index: ConnectionReceiverOILClient.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- ConnectionReceiverOILClient.java  2000/06/15 04:10:00     1.1
  +++ ConnectionReceiverOILClient.java  2000/06/15 22:50:31     1.2
  @@ -11,43 +11,56 @@
   import java.io.InputStream;
   import java.io.OutputStream;
   import java.io.IOException;
  +import java.io.Serializable;
   import java.net.Socket;
  +import java.net.InetAddress;
   
   public class ConnectionReceiverOILClient
  -     implements ConnectionReceiver
  +     implements ConnectionReceiver, Serializable
   {
        static final int RECEIVE = 1;
        static final int RECEIVE_MULTIPLE = 2;
        static final int DELETE_TEMPORARY_DESTINATION = 3;
        static final int CLOSE = 4;             
        
  -     Socket socket;
  -     InputStream is;
  -     OutputStream os;
  -     ObjectOutputStream out;
  -     ObjectInputStream in;
  +     private transient Socket socket;
  +     private transient InputStream is;
  +     private transient OutputStream os;
  +     private transient ObjectOutputStream out;
  +     private transient ObjectInputStream in;
  +     
  +     private int port;
  +     private InetAddress addr;
  +     
  +     public ConnectionReceiverOILClient(InetAddress addr,int port)
  +     {
  +             socket=null;
  +             this.port=port;
  +             this.addr=addr;
  +     }
        
  -     ConnectionReceiverOILClient() throws RemoteException 
  +     void createConnection() throws RemoteException
        {
  -             try {
  -                     Socket socket=new Socket("localhost",12345);
  +             try {                   
  +                     socket=new Socket(addr,port);
                        is=socket.getInputStream();
  -                     os=socket.getOutputStream();
  +                     os=socket.getOutputStream(); //BufferOutputStream instead
                        in=new ObjectInputStream(is);
                        out=new ObjectOutputStream(os);
                } catch (Exception e) {
                        Log.error(e);
  +                     throw new RemoteException("Cannot connect to the 
ConnectionReceiver/Server");
                }
        }
        
  -     public void waitAnswer() throws Exception 
  +     public void waitAnswer() throws RemoteException
        {
                try {
                        int val=is.read();
                        if (val==0) Log.log("Return : OK");
                        else {
                                Log.log("Return : Exception");
  -                             throw new Exception("Remote Exception");
  +                             throw new RemoteException("Remote Exception");
                        }
                } catch (IOException e) {
                        Log.error("IOException while reading the answer");
  @@ -57,37 +70,52 @@
        }
        
       public void receive(SpyDestination dest,SpyMessage mes) throws Exception
  -     {               
  +     {
  +             if (socket==null) createConnection();
                os.write(RECEIVE);
                out.writeObject(dest);
                out.writeObject(mes);
  +             os.flush();
                waitAnswer();
        }
                
       public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws 
Exception       
        {
  +             if (socket==null) createConnection();
                os.write(RECEIVE_MULTIPLE);
                out.writeObject(dest);
                out.writeObject(mes);
  +             os.flush();
                waitAnswer();
        }       
        
       public void deleteTemporaryDestination(SpyDestination dest) throws Exception
        {
  +             if (socket==null) createConnection();
                os.write(DELETE_TEMPORARY_DESTINATION);
                out.writeObject(dest);
  +             os.flush();
                waitAnswer();
        }       
       
        public void close() throws Exception
        {
  +             if (socket==null) createConnection();
                os.write(CLOSE);
  +             os.flush();
                waitAnswer();
        }
        
        public void setConnection(SpyConnection connection) throws Exception
        {
  -             //SHOULD NOT be there !!!
  +             //SHOULD NOT be there !!! - separate the interfaces
        }
  +
  +     public ConnectionReceiver createClient() throws Exception
  +     {
  +             return null;
  +             //SHOULD NOT be there !!! - separate the interfaces
  +     }
  +
   
   }
  
  
  
  1.9       +10 -2     
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMIImpl.java
  
  Index: ConnectionReceiverRMIImpl.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMIImpl.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- ConnectionReceiverRMIImpl.java    2000/06/15 04:02:28     1.8
  +++ ConnectionReceiverRMIImpl.java    2000/06/15 22:50:31     1.9
  @@ -23,15 +23,18 @@
   import java.util.Hashtable;
   import java.util.HashSet;
   import java.util.Iterator;
  +import org.spydermq.distributed.interfaces.ConnectionReceiver;
   
   /**
    *   The RMI implementation of the ConnectionReceiver object
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.8 $
  + *   @version $Revision: 1.9 $
    */
  -public class ConnectionReceiverRMIImpl extends UnicastRemoteObject implements 
ConnectionReceiverRMI
  +public class ConnectionReceiverRMIImpl 
  +     extends UnicastRemoteObject 
  +     implements ConnectionReceiverRMI
   {
        // Attributes ----------------------------------------------------
   
  @@ -152,4 +155,9 @@
                connection.deleteTemporaryDestination(dest);
        }
   
  +     public ConnectionReceiver createClient() throws JMSException
  +     {
  +             return this;
  +     }
  +     
   }
  
  
  

Reply via email to