User: pkendall
  Date: 01/08/08 17:27:21

  Modified:    src/main/org/jbossmq/il/oil OILClientIL.java
                        OILClientILService.java OILServerIL.java
                        OILServerILService.java
  Log:
  Major updates (especially to topics).
  Speed improvements.
  Make JVM IL work (by using a singleton JMSServer).
  Message Listeners re-implemented using client-side thread.
  
  Revision  Changes    Path
  1.3       +10 -12    jbossmq/src/main/org/jbossmq/il/oil/OILClientIL.java
  
  Index: OILClientIL.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/il/oil/OILClientIL.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- OILClientIL.java  2001/07/16 02:51:45     1.2
  +++ OILClientIL.java  2001/08/09 00:27:21     1.3
  @@ -25,18 +25,14 @@
   
   /**
    *   The RMI implementation of the ConnectionReceiver object
  - *      
  + *
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
  - * 
  - *   @version $Revision: 1.2 $
  + *
  + *   @version $Revision: 1.3 $
    */
   public class OILClientIL implements ClientIL, java.io.Serializable {
   
  -
  -
  -
  -
        public void close() throws Exception {
                checkSocket();
                out.writeByte(m_close);
  @@ -57,7 +53,9 @@
                checkSocket();
                cat.debug("Writing request");
                out.writeByte(m_receive);
  -             out.writeObject(messages);
  +    out.writeInt(messages.length);
  +    for(int i=0;i<messages.length;++i)
  +             messages[i].writeExternal(out);
                cat.debug("Waiting for awnser");
                waitAnswer();
                cat.debug("Done");
  @@ -84,13 +82,13 @@
   
        protected void createConnection() throws RemoteException
        {
  -             try {                   
  +             try {
                        cat.info("ConnectionReceiverOILClient is connecting to: 
"+addr.getHostAddress()+":"+port);
                        socket=new Socket(addr,port);
                        out=new ObjectOutputStream(new 
BufferedOutputStream(socket.getOutputStream()));
                        out.flush();
                        in=new ObjectInputStream(new 
BufferedInputStream(socket.getInputStream()));
  -             } catch (Exception e) {                 
  +             } catch (Exception e) {
                        cat.debug(e);
                        throw new RemoteException("Cannot connect to the 
ConnectionReceiver/Server");
                }
  @@ -104,15 +102,15 @@
                        out.flush();
                        int val=in.readByte();
                        switch(val) {
  -                     case 1:                         
  +                     case 1:
                                Exception e=(Exception)in.readObject();
                                throwException = new RemoteException("", e);
                                break;
                        }
  -             } catch (IOException e) {            
  +             } catch (IOException e) {
                        throw new RemoteException("Cannot contact the remote 
object",e);
                }
  -             
  +
                if( throwException != null )
                        throw throwException;
        }
  
  
  
  1.4       +36 -15    jbossmq/src/main/org/jbossmq/il/oil/OILClientILService.java
  
  Index: OILClientILService.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/il/oil/OILClientILService.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- OILClientILService.java   2001/08/01 01:18:32     1.3
  +++ OILClientILService.java   2001/08/09 00:27:21     1.4
  @@ -26,11 +26,11 @@
   
   /**
    *   The RMI implementation of the ConnectionReceiver object
  - *      
  + *
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.3 $
  + *   @version $Revision: 1.4 $
    */
   public class OILClientILService implements org.jbossmq.il.ClientILService, Runnable 
{
        //the client IL
  @@ -56,21 +56,24 @@
         * start method comment.
         */
        public void start() throws java.lang.Exception {
  -             
  +
                running = true;
                worker = new Thread(connection.threadGroup, this, 
"OILClientILService");
                worker.setDaemon(true);
  -             worker.start();         
  -             
  +             worker.start();
  +
        }
   
        /**
  -      * 
  +      *
         */
        public void stop() throws java.lang.Exception {
                running = false;
                worker.interrupt();
  -             socket.close();
  +    if(serverSocket != null)
  +      serverSocket.close();
  +    if(socket != null)
  +             socket.close();
        }
   
        static org.apache.log4j.Category cat = 
org.apache.log4j.Category.getInstance(OILClientILService.class);
  @@ -98,16 +101,19 @@
                        cat.debug("Waiting for the server to connect to me");
                        // We may close() before we get a connection so we need to
                        // periodicaly check to see if we were !running.
  -                     serverSocket.setSoTimeout(1000);                        
  +                     serverSocket.setSoTimeout(1000);
                        while( socket==null ) {
  -                             try {                   
  +                             try {
                                        socket = serverSocket.accept();
                                } catch (java.io.InterruptedIOException e) {
  -                             }
  +                             } catch (IOException e){
  +          if(running)
  +            connection.asynchFailure("Error accepting connection from server in 
OILClientILService.", e);
  +          return;
  +        }
                                if( !running )
                                        return;
                        }
  -                     serverSocket.close();
                        socket.setSoTimeout(0);
                        out = new ObjectOutputStream(new 
BufferedOutputStream(socket.getOutputStream()));
                        out.flush();
  @@ -116,7 +122,13 @@
                } catch (IOException e) {
                        connection.asynchFailure("Could not initialize the OILClientIL 
Service.", e);
                        return;
  -             }
  +             } finally {
  +               try{
  +        serverSocket.close();
  +        serverSocket = null;
  +               }catch(IOException e){
  +               }
  +    }
   
                while (running) {
   
  @@ -133,7 +145,13 @@
   
                                switch (code) {
                                        case m_receive:
  -                                             
connection.asynchDeliver((org.jbossmq.ReceiveRequest[])in.readObject());
  +            int numReceives = in.readInt();
  +            org.jbossmq.ReceiveRequest [] messages = new 
org.jbossmq.ReceiveRequest[numReceives];
  +            for(int i=0;i<numReceives;++i){
  +              messages[i] = new ReceiveRequest();
  +              messages[i].readExternal(in);
  +            }
  +                                             connection.asynchDeliver(messages);
                                                break;
                                        case m_deleteTemporaryDestination:
                                                
connection.asynchDeleteTemporaryDestination((SpyDestination) in.readObject());
  @@ -154,8 +172,8 @@
                                        connection.asynchFailure("Connection failure", 
e);
                                        return;
                                }
  +
   
  -                             
                        } catch (Exception e) {
                                if (!running)
                                        break;
  @@ -174,11 +192,14 @@
                        }
   
                }
  -             
  +
                try {
                        cat.debug("Closing receiver connections.");
                        out.close();
                        in.close();
  +      if(socket != null);
  +        socket.close();
  +      socket = null;
                } catch (IOException e) {
                        connection.asynchFailure("Connection failure", e);
                        return;
  
  
  
  1.3       +18 -18    jbossmq/src/main/org/jbossmq/il/oil/OILServerIL.java
  
  Index: OILServerIL.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/il/oil/OILServerIL.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- OILServerIL.java  2001/07/16 02:51:45     1.2
  +++ OILServerIL.java  2001/08/09 00:27:21     1.3
  @@ -19,6 +19,7 @@
   import org.jbossmq.SpyDestination;
   import org.jbossmq.ConnectionToken;
   import org.jbossmq.il.ServerIL;
  +import org.jbossmq.DurableSubcriptionID;
   import org.jbossmq.server.JMSServer;
   
   import org.jbossmq.il.uil.multiplexor.SocketMultiplexor;
  @@ -32,11 +33,11 @@
   
   /**
    *   The JVM implementation of the ServerIL object
  - *      
  + *
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    *   @author Norbert Lataille ([EMAIL PROTECTED])
  - * 
  - *   @version $Revision: 1.2 $
  + *
  + *   @version $Revision: 1.3 $
    */
   public class OILServerIL implements ServerIL, java.io.Serializable, Cloneable {
   
  @@ -49,7 +50,7 @@
        public ServerIL cloneServerIL() throws Exception {
                return (ServerIL)clone();
        }
  -     
  +
        synchronized public void setConnectionToken(ConnectionToken dest) throws 
Exception {
                checkConnection();
                out.writeByte(m_setSpyDistributedConnection);
  @@ -66,7 +67,7 @@
        synchronized public void addMessage(ConnectionToken dc, SpyMessage val) throws 
Exception {
                checkConnection();
                out.writeByte(m_addMessage);
  -             out.writeObject(val);
  +             SpyMessage.writeMessage(val,out);
                waitAnswer();
        }
   
  @@ -119,7 +120,7 @@
        synchronized public void acknowledge(ConnectionToken dc, 
AcknowledgementRequest item) throws JMSException, Exception {
                checkConnection();
                out.writeByte(m_acknowledge);
  -             out.writeObject(item);
  +             item.writeExternal(out);
                waitAnswer();
        }
   
  @@ -131,14 +132,6 @@
                return (SpyMessage[]) waitAnswer();
        }
   
  -     synchronized public void listenerChange(ConnectionToken dc, int subscriberId, 
boolean state) throws Exception, Exception {
  -             checkConnection();
  -             out.writeByte(m_listenerChange);
  -             out.writeInt(subscriberId);
  -             out.writeBoolean(state);
  -             waitAnswer();
  -     }
  -
        synchronized public SpyMessage receive(ConnectionToken dc, int subscriberId, 
long wait) throws Exception, Exception {
                checkConnection();
                out.writeByte(m_receive);
  @@ -161,12 +154,19 @@
                waitAnswer();
        }
   
  +     synchronized public void destroySubscription(DurableSubcriptionID id) throws 
JMSException, Exception {
  +             checkConnection();
  +             out.writeByte(m_destroySubscription);
  +             out.writeObject(id);
  +             waitAnswer();
  +     }
  +
        synchronized public String checkUser(String userName, String password) throws 
JMSException, Exception {
                checkConnection();
                out.writeByte(m_checkUser);
                out.writeObject(userName);
                out.writeObject(password);
  -             return (String)waitAnswer();            
  +             return (String)waitAnswer();
        }
   
        synchronized public void subscribe(ConnectionToken dc, 
org.jbossmq.Subscription s) throws JMSException, Exception {
  @@ -179,7 +179,7 @@
        synchronized public void transact(org.jbossmq.ConnectionToken dc, 
TransactionRequest t) throws JMSException, Exception {
                checkConnection();
                out.writeByte(m_transact);
  -             out.writeObject(t);
  +    t.writeExternal(out);
                waitAnswer();
        }
   
  @@ -189,7 +189,6 @@
        static final int m_addMessage = 2;
        static final int m_browse = 3;
        static final int m_checkID = 4;
  -     static final int m_checkUser = 19;
        static final int m_connectionClosing = 5;
        static final int m_createQueue = 6;
        static final int m_createTopic = 7;
  @@ -197,13 +196,14 @@
        static final int m_getID = 9;
        static final int m_getTemporaryQueue = 10;
        static final int m_getTemporaryTopic = 11;
  -     static final int m_listenerChange = 12;
        static final int m_receive = 13;
        static final int m_setEnabled = 14;
        static final int m_setSpyDistributedConnection = 15;
        static final int m_subscribe = 16;
        static final int m_transact = 17;
        static final int m_unsubscribe = 18;
  +     static final int m_destroySubscription = 19;
  +     static final int m_checkUser = 20;
   
        protected transient ObjectOutputStream out;
        //Remote stuff
  
  
  
  1.7       +49 -43    jbossmq/src/main/org/jbossmq/il/oil/OILServerILService.java
  
  Index: OILServerILService.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/il/oil/OILServerILService.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- OILServerILService.java   2001/08/01 01:18:32     1.6
  +++ OILServerILService.java   2001/08/09 00:27:21     1.7
  @@ -16,6 +16,7 @@
   import org.jbossmq.il.ServerILJMXService;
   import org.jbossmq.il.ServerIL;
   import org.jbossmq.server.JMSServer;
  +import org.jbossmq.DurableSubcriptionID;
   
   import java.net.Socket;
   import javax.jms.JMSException;
  @@ -42,14 +43,14 @@
   import org.jbossmq.SpyDestination;
   
   /**
  - *   Implements the ServerILJMXService which is used to 
  + *   Implements the ServerILJMXService which is used to
    *  manage the JVM IL.
    *
    *   @author Hiram Chirino ([EMAIL PROTECTED])
  - *   @version $Revision: 1.6 $
  + *   @version $Revision: 1.7 $
    */
   public class OILServerILService extends ServerILJMXService implements 
OILServerILServiceMBean, Runnable {
  -     
  +
        OILServerIL serverIL;
   
        /**
  @@ -77,13 +78,13 @@
   
                serverSocket = new ServerSocket(serverBindPort);
                serverSocket.setSoTimeout(SO_TIMEOUT);
  -             
  +
                category.info("JBossMQ OIL service available at : 
"+InetAddress.getLocalHost().getHostAddress()+ ":"+ serverSocket.getLocalPort());
                worker = new Thread(server.threadGroup, this,"OIL Worker");
                worker.start();
  -             
  +
                serverIL = new OILServerIL(InetAddress.getLocalHost(), 
serverSocket.getLocalPort());
  -             
  +
                bindJNDIReferences();
   
        }
  @@ -115,7 +116,6 @@
        static final int m_addMessage = 2;
        static final int m_browse = 3;
        static final int m_checkID = 4;
  -     static final int m_checkUser = 19;
        static final int m_connectionClosing = 5;
        static final int m_createQueue = 6;
        static final int m_createTopic = 7;
  @@ -123,13 +123,15 @@
        static final int m_getID = 9;
        static final int m_getTemporaryQueue = 10;
        static final int m_getTemporaryTopic = 11;
  -     static final int m_listenerChange = 12;
        static final int m_receive = 13;
        static final int m_setEnabled = 14;
        static final int m_setSpyDistributedConnection = 15;
        static final int m_subscribe = 16;
        static final int m_transact = 17;
        static final int m_unsubscribe = 18;
  +     static final int m_destroySubscription = 19;
  +     static final int m_checkUser = 20;
  +
        boolean running;
        //The server implementation
        protected static JMSServer server;
  @@ -147,7 +149,7 @@
                ObjectInputStream in=null;
                ConnectionToken connectionToken=null;
                boolean closed = false;
  -                   
  +
                try {
                        while( running && socket==null ) {
                                try {
  @@ -161,7 +163,7 @@
   
                        socket.setSoTimeout(0);
                        new Thread(this,"OIL Worker").start();
  -                     
  +
                        out = new ObjectOutputStream(new 
BufferedOutputStream(socket.getOutputStream()));
                        out.flush();
                        in = new ObjectInputStream(new 
BufferedInputStream(socket.getInputStream()));
  @@ -187,54 +189,53 @@
                                category.warn("Connection failure (1).", e);
                                break;
                        }
  -             
  +
                        try {
  -                             
  -                             Object result=null;                                    
 
  -                             
  +
  +                             Object result=null;
  +
                                switch (code)
                                {
                                        case m_setSpyDistributedConnection:
                                                connectionToken = 
(ConnectionToken)in.readObject();
  -                                     category.debug("The UILClientIL Connection is 
set up");
  -                                             break;                                 
         
  +                                       category.debug("The UILClientIL Connection 
is set up");
  +                                             break;
                                        case m_acknowledge:
  -                                             server.acknowledge(connectionToken, 
(AcknowledgementRequest)in.readObject());
  +            AcknowledgementRequest ack = new AcknowledgementRequest();
  +            ack.readExternal(in);
  +                                             server.acknowledge(connectionToken, 
ack);
                                                break;
  -                                     case m_addMessage:                             
                 
  -                                             server.addMessage(connectionToken, 
(SpyMessage)in.readObject());
  +                                     case m_addMessage:
  +                                             server.addMessage(connectionToken, 
SpyMessage.readMessage(in));
                                                break;
  -                                     case m_browse:                                 
         
  +                                     case m_browse:
                                                result=server.browse(connectionToken, 
(Destination)in.readObject(), (String)in.readObject());
                                                break;
  -                                     case m_checkID: 
  +                                     case m_checkID:
                                                
server.checkID((String)in.readObject());
                                                break;
  -                                     case m_connectionClosing: 
  +                                     case m_connectionClosing:
                                                
server.connectionClosing(connectionToken);
                                                closed = true;
                                                break;
  -                                     case m_createQueue: 
  +                                     case m_createQueue:
                                                
result=(Queue)server.createQueue(connectionToken, (String)in.readObject());
                                                break;
  -                                     case m_createTopic: 
  +                                     case m_createTopic:
                                                
result=(Topic)server.createTopic(connectionToken, (String)in.readObject());
                                                break;
  -                                     case m_deleteTemporaryDestination: 
  +                                     case m_deleteTemporaryDestination:
                                                
server.deleteTemporaryDestination(connectionToken, (SpyDestination)in.readObject());
                                                break;
  -                                     case m_getID: 
  +                                     case m_getID:
                                                result=server.getID();
                                                break;
  -                                     case m_getTemporaryQueue: 
  +                                     case m_getTemporaryQueue:
                                                
result=(TemporaryQueue)server.getTemporaryQueue(connectionToken);
                                                break;
                                        case m_getTemporaryTopic:
                                                
result=(TemporaryTopic)server.getTemporaryTopic(connectionToken);
                                                break;
  -                                     case m_listenerChange: 
  -                                             
server.listenerChange(connectionToken,in.readInt(),in.readBoolean());
  -                                             break;
                                        case m_receive:
                                                
result=server.receive(connectionToken,in.readInt(), in.readLong());
                                                break;
  @@ -245,22 +246,27 @@
                                                server.subscribe(connectionToken, 
(Subscription)in.readObject());
                                                break;
                                        case m_transact:
  -                                             server.transact(connectionToken, 
(TransactionRequest)in.readObject());
  +            TransactionRequest trans = new TransactionRequest();
  +            trans.readExternal(in);
  +                                             server.transact(connectionToken, 
trans);
                                                break;
  -                                     case m_unsubscribe: 
  +                                     case m_unsubscribe:
                                                
server.unsubscribe(connectionToken,in.readInt());
  +                                             break;
  +                                     case m_destroySubscription:
  +                                             
server.destroySubscription((DurableSubcriptionID)in.readObject());
                                                break;
  -                                     case m_checkUser: 
  +                                     case m_checkUser:
                                                result = 
server.checkUser((String)in.readObject(), (String)in.readObject());
                                                break;
                                        default:
                                                throw new RemoteException("Bad method 
code !");
                                }
  -                             
  +
                                //Everthing was OK
  -                             
  +
                                try {
  -                                     if (result==null) 
  +                                     if (result==null)
                                                out.writeByte(0);
                                        else {
                                                out.writeByte(1);
  @@ -271,15 +277,15 @@
                                } catch (IOException e) {
                                        if( closed )
                                                break;
  -                                             
  +
                                        category.warn("Connection failure (2).", e);
  -                                     break;                                  
  +                                     break;
                                }
  -                             
  +
                        } catch (Exception e) {
                                if( closed )
                                        break;
  -                                     
  +
                                category.info("Client request resulted in a server 
exception: ", e);
   
                                try {
  @@ -290,10 +296,10 @@
                                } catch (IOException e2) {
                                        if( closed )
                                                break;
  -                                             
  +
                                        category.warn("Connection failure (3).", e);
  -                                     break;                                  
  -                             }                               
  +                                     break;
  +                             }
                        }
                }
   
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to