User: norbert 
  Date: 00/06/19 19:19:14

  Modified:    src/java/org/spydermq/distributed/server
                        ConnectionReceiverOIL.java
                        ConnectionReceiverOILClient.java
                        DistributedJMSServerOIL.java
                        DistributedJMSServerOILClient.java
  Log:
  Some optimizations to the OIL
  
  Revision  Changes    Path
  1.6       +59 -17    
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java
  
  Index: ConnectionReceiverOIL.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- ConnectionReceiverOIL.java        2000/06/19 21:52:00     1.5
  +++ ConnectionReceiverOIL.java        2000/06/20 02:19:13     1.6
  @@ -27,11 +27,12 @@
   import java.net.ServerSocket;
   import java.net.Socket;
   import java.net.InetAddress;
  -import java.io.ObjectOutputStream;
  -import java.io.BufferedOutputStream;
  -import java.io.ObjectInputStream;
   import java.io.InputStream;
   import java.io.OutputStream;
  +import java.io.BufferedOutputStream;
  +import java.io.BufferedInputStream;
  +import java.io.ObjectInputStream;
  +import java.io.ObjectOutputStream;
   import java.io.IOException;
   
   /**
  @@ -39,7 +40,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.5 $
  + *   @version $Revision: 1.6 $
    */
   public class ConnectionReceiverOIL 
        implements Runnable, ConnectionReceiverSetup
  @@ -82,8 +83,10 @@
        {
                Socket socket = null;
                int code = 0;
  -             InputStream is=null;
  -             OutputStream os=null;
  +             //InputStream is=null;
  +             //OutputStream os=null;
  +             BufferedInputStream is=null;
  +             BufferedOutputStream os=null;
                ObjectOutputStream out=null;
                ObjectInputStream in=null;
                      
  @@ -92,10 +95,14 @@
                  
                        //We have our connection to the broker... there's no need to 
wait for another connection
                        //new Thread(this).start();
  +
  +                     //is = socket.getInputStream();
  +                     //os = socket.getOutputStream();
  +                     is = new BufferedInputStream(socket.getInputStream());
  +                     os = new BufferedOutputStream(socket.getOutputStream());
   
  -                     is = socket.getInputStream();
  -                     os = socket.getOutputStream();
  -                     out = new ObjectOutputStream(os); 
  +                     out = new ObjectOutputStream(os);
  +                     out.flush();
                        in = new ObjectInputStream(is);
   
                } catch (IOException e) {
  @@ -106,13 +113,14 @@
                while (true) {
   
                        try {
  -                             code=is.read();         
  +                             code=is.read();
                        } catch (IOException e) {
                                failure("Command read",e);
                                return;
                        }
                
                        try {
  +
                                
                                switch (code)
                                {
  @@ -120,7 +128,9 @@
                                                
receive((SpyDestination)in.readObject(),(SpyMessage)in.readObject());
                                                break;
                                        case RECEIVE_MULTIPLE: 
  -                                             
receiveMultiple((SpyDestination)in.readObject(),(SpyMessage[])in.readObject());
  +                                             SpyDestination 
dest=(SpyDestination)in.readObject();
  +                                             int nb=in.readInt();
  +                                             receiveMultiple(dest,nb,in);
                                                break;
                                        case DELETE_TEMPORARY_DESTINATION:
                                                
deleteTemporaryDestination((SpyDestination)in.readObject());
  @@ -147,6 +157,7 @@
                                try {
                                        os.write(1);
                                        out.writeObject(e.getMessage());
  +                                     out.flush();
                                        os.flush();
                                } catch (IOException e2) {
                                        failure("Result write",e2);
  @@ -269,14 +280,45 @@
                        
                }
        } 
  -     
   
  -     public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws 
JMSException
  -     {
  -             for(int i=0;i<mes.length;i++) {
  -                     receive(dest,mes[i]);
  +    public void receiveMultiple(SpyDestination dest,int nb,ObjectInputStream in) 
throws Exception
  +     {               
  +             if (closed) throw new IllegalStateException("The connection is 
closed");
  +             
  +             Log.log("ConnectionReceiver: ReceiveMultiple()");
  +             
  +             if (connection instanceof SpyTopicConnection) {
  +             
  +                     //Get the set of subscribers for this Topic
  +             
  +                     ConnectionQueue 
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);             
  +                     if (connectionQueue==null) return;
  +     
  +                     for(int val=0;val<nb;val++) {
  +                     
  +                             SpyMessage mes=(SpyMessage)in.readObject();
  +                             
  +                             //NL: i is a short-lived object. Try to "group" 
messages in an pre-allocated/peristant
  +                             //array and apply the same iterator on this array
  +                             Iterator i=connectionQueue.subscribers.iterator();
  +                                     
  +                             while (i.hasNext()) {                                  
                         
  +                             
  +                                     SpySession session=(SpySession)i.next();
  +                                     
  +                                     //add the new message to the session's queue
  +                                     session.dispatchMessage(dest,mes);
  +                             
  +                                     //There is work to do... 
  +                                     session.mutex.notifyLock();
  +                             }
  +                     }
  +                     
  +             } else {
  +                     throw new Exception("Multiple dispatch for a Queue");          
         
                }
  -     }
  +     } 
  +     
        
       public void close() throws Exception
        {
  
  
  
  1.7       +11 -6     
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java
  
  Index: ConnectionReceiverOILClient.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- ConnectionReceiverOILClient.java  2000/06/19 21:52:00     1.6
  +++ ConnectionReceiverOILClient.java  2000/06/20 02:19:13     1.7
  @@ -9,7 +9,9 @@
   import java.io.ObjectOutputStream;
   import java.io.ObjectInputStream;
   import java.io.InputStream;
  +import java.io.BufferedInputStream;
   import java.io.OutputStream;
  +import java.io.BufferedOutputStream;
   import java.io.IOException;
   import java.io.Serializable;
   import java.net.Socket;
  @@ -24,8 +26,8 @@
        static final int CLOSE = 4;             
        
        private transient Socket socket;
  -     private transient InputStream is;
  -     private transient OutputStream os;
  +     private transient BufferedInputStream is;
  +     private transient BufferedOutputStream os;
        private transient ObjectOutputStream out;
        private transient ObjectInputStream in;
        
  @@ -43,11 +45,12 @@
        {
                try {                   
                        socket=new Socket(addr,port);
  -                     is=socket.getInputStream();
  -                     os=socket.getOutputStream();
  +                     is = new BufferedInputStream(socket.getInputStream());
  +                     os = new BufferedOutputStream(socket.getOutputStream());
                        in=new ObjectInputStream(is);
                        out=new ObjectOutputStream(os);
  -             } catch (Exception e) {
  +                     os.flush();
  +             } catch (Exception e) {                 
                        Log.error(e);
                        throw new RemoteException("Cannot connect to the 
ConnectionReceiver/Server");
                }
  @@ -85,7 +88,9 @@
                if (socket==null) createConnection();
                os.write(RECEIVE_MULTIPLE);
                out.writeObject(dest);
  -             out.writeObject(mes);
  +             out.writeInt(mes.length);
  +             for(int i=0;i<mes.length;i++)
  +                     out.writeObject(mes[i]);
                waitAnswer();
        }       
        
  
  
  
  1.3       +31 -9     
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOIL.java
  
  Index: DistributedJMSServerOIL.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOIL.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- DistributedJMSServerOIL.java      2000/06/19 21:52:00     1.2
  +++ DistributedJMSServerOIL.java      2000/06/20 02:19:13     1.3
  @@ -7,6 +7,7 @@
   import javax.jms.TemporaryTopic;
   import javax.jms.TemporaryQueue;
   import org.spydermq.SpyMessage;
  +import org.spydermq.JMSServerQueue;
   import org.spydermq.SpyDestination;
   import org.spydermq.JMSServer;
   import org.spydermq.SpyDistributedConnection;
  @@ -20,8 +21,7 @@
   import java.io.ObjectOutputStream;
   import java.io.BufferedOutputStream;
   import java.io.ObjectInputStream;
  -import java.io.InputStream;
  -import java.io.OutputStream;
  +import java.io.BufferedInputStream;
   import java.io.IOException;
   
   public class DistributedJMSServerOIL
  @@ -72,8 +72,8 @@
        {
                Socket socket = null;
                int code = 0;
  -             InputStream is=null;
  -             OutputStream os=null;
  +             BufferedInputStream is=null;
  +             BufferedOutputStream os=null;
                ObjectOutputStream out=null;
                ObjectInputStream in=null;
                      
  @@ -82,9 +82,10 @@
                  
                        new Thread(this).start();
   
  -                     is = socket.getInputStream();
  -                     os = socket.getOutputStream();
  -                     out = new ObjectOutputStream(os); 
  +                     is = new BufferedInputStream(socket.getInputStream());
  +                     os = new BufferedOutputStream(socket.getOutputStream());
  +                     out = new ObjectOutputStream(os);
  +                     os.flush();
                        in = new ObjectInputStream(is);
   
                } catch (IOException e) {
  @@ -110,8 +111,8 @@
                                        case GetID: 
                                                result=server.getID();
                                                break;
  -                                     case NewMessage: 
  -                                             
server.newMessage((SpyMessage[])in.readObject(),(String)in.readObject());
  +                                     case NewMessage:                               
                 
  +                                             
newMessage((String)in.readObject(),in.readInt(),in);
                                                break;
                                        case Subscribe: 
                                                
server.subscribe((Destination)in.readObject(),(SpyDistributedConnection)in.readObject());
  @@ -184,6 +185,27 @@
        {
                Log.error("Closing socket: "+st);
                Log.error(e);
  +     }
  +     
  +     void newMessage(String id,int nb,ObjectInputStream in) throws Exception
  +     {
  +             Log.notice("INCOMING: "+nb+" messages from "+id);
  +             
  +             SpyDestination dest=null;
  +             JMSServerQueue queue=null;
  +             
  +             for(int i=0;i<nb;i++) { 
  +                     
  +                     SpyMessage mes=(SpyMessage)in.readObject();
  +                     
  +                     if (dest==null||!dest.equals(mes.jmsDestination)) {
  +                             
queue=(JMSServerQueue)server.messageQueue.get(mes.jmsDestination);
  +                             if (queue==null) throw new JMSException("This 
destination does not exist !"); //hum...
  +                     }
  +             
  +                     //Add the message to the queue          
  +                     queue.addMessage(mes);
  +             }
        }
        
        // --
  
  
  
  1.2       +19 -23    
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOILClient.java
  
  Index: DistributedJMSServerOILClient.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOILClient.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- DistributedJMSServerOILClient.java        2000/06/19 04:23:14     1.1
  +++ DistributedJMSServerOILClient.java        2000/06/20 02:19:13     1.2
  @@ -14,8 +14,8 @@
   import java.rmi.RemoteException;
   import java.io.ObjectOutputStream;
   import java.io.ObjectInputStream;
  -import java.io.InputStream;
  -import java.io.OutputStream;
  +import java.io.BufferedInputStream;
  +import java.io.BufferedOutputStream;
   import java.io.IOException;
   import java.io.Serializable;
   import java.net.Socket;
  @@ -43,8 +43,8 @@
        //Remote stuff
        
        private transient Socket socket;
  -     private transient InputStream is;
  -     private transient OutputStream os;
  +     private transient BufferedInputStream is;
  +     private transient BufferedOutputStream os;
        private transient ObjectOutputStream out;
        private transient ObjectInputStream in;
        
  @@ -62,10 +62,11 @@
        {
                try {                   
                        socket=new Socket(addr,port);
  -                     is=socket.getInputStream();
  -                     os=socket.getOutputStream();
  +                     is=new BufferedInputStream(socket.getInputStream());
  +                     os=new BufferedOutputStream(socket.getOutputStream());
                        in=new ObjectInputStream(is);
                        out=new ObjectOutputStream(os);
  +                     os.flush();
                } catch (Exception e) {
                        failure(e);
                }
  @@ -98,34 +99,36 @@
        
        //--- Remote Calls
        
  -     public String getID() throws Exception
  +    public void newMessage(SpyMessage val[],String id) throws JMSException, 
RemoteException
        {
                if (socket==null) createConnection();
                
                try {
  -                     os.write(GetID);
  +                     os.write(NewMessage);
  +                     out.writeObject(id);
  +                     out.writeInt(val.length);
  +                     for(int i=0;i<val.length;i++)
  +                             out.writeObject(val[i]);
                } catch (IOException e) {
                        failure(e);
                }
                
  -             return (String)waitAnswer();
  +             waitAnswer();
        }
  -     
  -    public void newMessage(SpyMessage val[],String id) throws JMSException, 
RemoteException
  +
  +     public String getID() throws Exception
        {
                if (socket==null) createConnection();
                
                try {
  -                     os.write(NewMessage);
  -                     out.writeObject(val);
  -                     out.writeObject(id);
  +                     os.write(GetID);
                } catch (IOException e) {
                        failure(e);
                }
                
  -             waitAnswer();
  +             return (String)waitAnswer();
        }
  -     
  +             
       public void subscribe(Destination dest,SpyDistributedConnection who) throws 
JMSException, RemoteException
        {
                if (socket==null) createConnection();
  @@ -283,13 +286,6 @@
                }
                
                waitAnswer();
  -     }
  -
  -     //--
  -     
  -     public void setServer(JMSServer s) throws Exception
  -     {
  -             //Nothing !
        }
        
   }
  
  
  

Reply via email to