User: pkendall
  Date: 01/08/08 17:28:42

  Modified:    src/main/org/jbossmq/il/uil UILClientIL.java
                        UILClientILService.java UILServerIL.java
                        UILServerILService.java
  Log:
  Major updates (especially to topics).
  Speed improvements.
  Make JVM IL work (by using a singleton JMSServer).
  Message Listeners re-implemented using client-side thread.
  
  Revision  Changes    Path
  1.3       +9 -7      jbossmq/src/main/org/jbossmq/il/uil/UILClientIL.java
  
  Index: UILClientIL.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/il/uil/UILClientIL.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- UILClientIL.java  2001/07/16 02:51:46     1.2
  +++ UILClientIL.java  2001/08/09 00:28:42     1.3
  @@ -24,11 +24,11 @@
   
   /**
    *   The RMI implementation of the ConnectionReceiver object
  - *      
  + *
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
  - * 
  - *   @version $Revision: 1.2 $
  + *
  + *   @version $Revision: 1.3 $
    */
   public class UILClientIL implements ClientIL, java.io.Serializable {
   
  @@ -53,7 +53,9 @@
                checkSocket();
                cat.debug("Writing request");
                out.writeByte(m_receive);
  -             out.writeObject(messages);
  +    out.writeInt(messages.length);
  +    for(int i=0;i<messages.length;++i)
  +             messages[i].writeExternal(out);
                cat.debug("Waiting for awnser");
                waitAnswer();
                cat.debug("Done");
  @@ -78,7 +80,7 @@
                        in=new ObjectInputStream(new 
BufferedInputStream(mSocket.getInputStream(2)));
                        out=new ObjectOutputStream(new 
BufferedOutputStream(mSocket.getOutputStream(2)));
                        out.flush();
  -             } catch (Exception e) {                 
  +             } catch (Exception e) {
                        throw new RemoteException("Cannot connect to the 
ConnectionReceiver/Server");
                }
        }
  @@ -91,15 +93,15 @@
                        out.flush();
                        int val=in.readByte();
                        switch(val) {
  -                     case 1:                         
  +                     case 1:
                                Exception e=(Exception)in.readObject();
                                throwException = new RemoteException("", e);
                                break;
                        }
  -             } catch (IOException e) {            
  +             } catch (IOException e) {
                        throw new RemoteException("Cannot contact the remote 
object",e);
                }
  -             
  +
                if( throwException != null )
                        throw throwException;
        }
  
  
  
  1.4       +14 -8     jbossmq/src/main/org/jbossmq/il/uil/UILClientILService.java
  
  Index: UILClientILService.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/il/uil/UILClientILService.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- UILClientILService.java   2001/08/01 01:18:22     1.3
  +++ UILClientILService.java   2001/08/09 00:28:42     1.4
  @@ -25,11 +25,11 @@
   
   /**
    *   The RMI implementation of the ConnectionReceiver object
  - *      
  + *
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.3 $
  + *   @version $Revision: 1.4 $
    */
   public class UILClientILService implements org.jbossmq.il.ClientILService, Runnable 
{
        //the client IL
  @@ -54,16 +54,16 @@
         * start method comment.
         */
        public void start() throws java.lang.Exception {
  -             
  +
                running = true;
                worker = new Thread(connection.threadGroup, this, 
"UILClientILService");
                worker.setDaemon(true);
  -             worker.start();         
  -             
  +             worker.start();
  +
        }
   
        /**
  -      * 
  +      *
         */
        public void stop() throws java.lang.Exception {
                running = false;
  @@ -84,7 +84,7 @@
   
        public void run() {
                cat.debug("UILClientILService.run()");
  -             
  +
                Socket socket = null;
                int code = 0;
   
  @@ -120,7 +120,13 @@
   
                                switch (code) {
                                        case m_receive:
  -                                             
connection.asynchDeliver((org.jbossmq.ReceiveRequest[])in.readObject());
  +            int numReceives = in.readInt();
  +            org.jbossmq.ReceiveRequest [] messages = new 
org.jbossmq.ReceiveRequest[numReceives];
  +            for(int i=0;i<numReceives;++i){
  +              messages[i] = new ReceiveRequest();
  +              messages[i].readExternal(in);
  +            }
  +                                             connection.asynchDeliver(messages);
                                                break;
                                        case m_deleteTemporaryDestination:
                                                
connection.asynchDeleteTemporaryDestination((SpyDestination) in.readObject());
  
  
  
  1.3       +19 -18    jbossmq/src/main/org/jbossmq/il/uil/UILServerIL.java
  
  Index: UILServerIL.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/il/uil/UILServerIL.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- UILServerIL.java  2001/07/16 02:51:46     1.2
  +++ UILServerIL.java  2001/08/09 00:28:42     1.3
  @@ -20,6 +20,7 @@
   import org.jbossmq.ConnectionToken;
   import org.jbossmq.il.ServerIL;
   import org.jbossmq.server.JMSServer;
  +import org.jbossmq.DurableSubcriptionID;
   
   import org.jbossmq.il.uil.multiplexor.SocketMultiplexor;
   import java.io.ObjectInputStream;
  @@ -32,11 +33,11 @@
   
   /**
    *   The JVM implementation of the ServerIL object
  - *      
  + *
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    *   @author Norbert Lataille ([EMAIL PROTECTED])
  - * 
  - *   @version $Revision: 1.2 $
  + *
  + *   @version $Revision: 1.3 $
    */
   public class UILServerIL implements ServerIL, java.io.Serializable, Cloneable {
   
  @@ -49,7 +50,7 @@
        public ServerIL cloneServerIL() throws Exception {
                return (ServerIL)clone();
        }
  -     
  +
        synchronized public void setConnectionToken(ConnectionToken dest) throws 
Exception {
                checkConnection();
                out.writeByte(m_setSpyDistributedConnection);
  @@ -66,7 +67,7 @@
        synchronized public void addMessage(ConnectionToken dc, SpyMessage val) throws 
Exception {
                checkConnection();
                out.writeByte(m_addMessage);
  -             out.writeObject(val);
  +             SpyMessage.writeMessage(val,out);
                waitAnswer();
        }
   
  @@ -119,7 +120,7 @@
        synchronized public void acknowledge(ConnectionToken dc, 
AcknowledgementRequest item) throws JMSException, Exception {
                checkConnection();
                out.writeByte(m_acknowledge);
  -             out.writeObject(item);
  +             item.writeExternal(out);
                waitAnswer();
        }
   
  @@ -131,14 +132,6 @@
                return (SpyMessage[]) waitAnswer();
        }
   
  -     synchronized public void listenerChange(ConnectionToken dc, int subscriberId, 
boolean state) throws Exception, Exception {
  -             checkConnection();
  -             out.writeByte(m_listenerChange);
  -             out.writeInt(subscriberId);
  -             out.writeBoolean(state);
  -             waitAnswer();
  -     }
  -
        synchronized public SpyMessage receive(ConnectionToken dc, int subscriberId, 
long wait) throws Exception, Exception {
                checkConnection();
                out.writeByte(m_receive);
  @@ -161,12 +154,19 @@
                waitAnswer();
        }
   
  +     synchronized public void destroySubscription(DurableSubcriptionID id) throws 
JMSException, Exception {
  +             checkConnection();
  +             out.writeByte(m_destroySubscription);
  +             out.writeObject(id);
  +             waitAnswer();
  +     }
  +
        synchronized public String checkUser(String userName, String password) throws 
JMSException, Exception {
                checkConnection();
                out.writeByte(m_checkUser);
                out.writeObject(userName);
                out.writeObject(password);
  -             return (String)waitAnswer();            
  +             return (String)waitAnswer();
        }
   
        synchronized public void subscribe(ConnectionToken dc, 
org.jbossmq.Subscription s) throws JMSException, Exception {
  @@ -179,7 +179,7 @@
        synchronized public void transact(org.jbossmq.ConnectionToken dc, 
TransactionRequest t) throws JMSException, Exception {
                checkConnection();
                out.writeByte(m_transact);
  -             out.writeObject(t);
  +    t.writeExternal(out);
                waitAnswer();
        }
   
  @@ -189,7 +189,6 @@
        static final int m_addMessage = 2;
        static final int m_browse = 3;
        static final int m_checkID = 4;
  -     static final int m_checkUser = 19;
        static final int m_connectionClosing = 5;
        static final int m_createQueue = 6;
        static final int m_createTopic = 7;
  @@ -197,13 +196,15 @@
        static final int m_getID = 9;
        static final int m_getTemporaryQueue = 10;
        static final int m_getTemporaryTopic = 11;
  -     static final int m_listenerChange = 12;
        static final int m_receive = 13;
        static final int m_setEnabled = 14;
        static final int m_setSpyDistributedConnection = 15;
        static final int m_subscribe = 16;
        static final int m_transact = 17;
        static final int m_unsubscribe = 18;
  +     static final int m_destroySubscription = 19;
  +     static final int m_checkUser = 20;
  +
        protected transient SocketMultiplexor mSocket;
        protected transient ObjectOutputStream out;
        //Remote stuff
  
  
  
  1.7       +50 -43    jbossmq/src/main/org/jbossmq/il/uil/UILServerILService.java
  
  Index: UILServerILService.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/il/uil/UILServerILService.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- UILServerILService.java   2001/08/01 01:18:22     1.6
  +++ UILServerILService.java   2001/08/09 00:28:42     1.7
  @@ -16,6 +16,7 @@
   import org.jbossmq.il.ServerILJMXService;
   import org.jbossmq.il.ServerIL;
   import org.jbossmq.server.JMSServer;
  +import org.jbossmq.DurableSubcriptionID;
   
   import java.net.Socket;
   import javax.jms.JMSException;
  @@ -42,14 +43,14 @@
   import org.jbossmq.SpyDestination;
   
   /**
  - *   Implements the ServerILJMXService which is used to 
  + *   Implements the ServerILJMXService which is used to
    *  manage the JVM IL.
    *
    *   @author Hiram Chirino ([EMAIL PROTECTED])
  - *   @version $Revision: 1.6 $
  + *   @version $Revision: 1.7 $
    */
   public class UILServerILService extends ServerILJMXService implements 
UILServerILServiceMBean, Runnable {
  -     
  +
        UILServerIL serverIL;
   
        /**
  @@ -77,13 +78,13 @@
   
                serverSocket = new ServerSocket(serverBindPort);
                serverSocket.setSoTimeout(SO_TIMEOUT);
  -             
  +
                category.info("JBossMQ UIL service available at : 
"+InetAddress.getLocalHost().getHostAddress()+ ":"+ serverSocket.getLocalPort());
                worker = new Thread(server.threadGroup, this,"UIL Worker");
                worker.start();
  -             
  +
                serverIL = new UILServerIL(InetAddress.getLocalHost(), 
serverSocket.getLocalPort());
  -             
  +
                bindJNDIReferences();
   
        }
  @@ -115,7 +116,6 @@
        static final int m_addMessage = 2;
        static final int m_browse = 3;
        static final int m_checkID = 4;
  -     static final int m_checkUser = 19;
        static final int m_connectionClosing = 5;
        static final int m_createQueue = 6;
        static final int m_createTopic = 7;
  @@ -130,6 +130,9 @@
        static final int m_subscribe = 16;
        static final int m_transact = 17;
        static final int m_unsubscribe = 18;
  +     static final int m_destroySubscription = 19;
  +     static final int m_checkUser = 20;
  +
        boolean running;
        //The server implementation
        protected static JMSServer server;
  @@ -147,9 +150,9 @@
                ObjectInputStream in=null;
                ConnectionToken connectionToken=null;
                boolean closed = false;
  -                  
  +
                try {
  -                     
  +
                        while( running && socket==null ) {
                                try {
                                        socket = serverSocket.accept();
  @@ -167,7 +170,7 @@
   
                        out = new ObjectOutputStream(new 
BufferedOutputStream(mSocket.getOutputStream(1)));
                        out.flush();
  -                     
  +
                        in = new ObjectInputStream(new 
BufferedInputStream(mSocket.getInputStream(1)));
   
                } catch (IOException e) {
  @@ -187,15 +190,15 @@
                        catch (IOException e) {
                                if( closed || !running )
                                        break;
  -                                     
  +
                                category.warn("Connection failure (1).", e);
                                break;
                        }
  -             
  +
                        try {
  -                             
  -                             Object result=null;                                    
 
  -                             
  +
  +                             Object result=null;
  +
                                switch (code)
                                {
                                        case m_setSpyDistributedConnection:
  @@ -205,44 +208,43 @@
                                                
((UILClientIL)connectionToken.clientIL).createConnection();
                                        category.debug("The UILClientIL Connection is 
set up");
                                                break;
  -                                             
  +
                                        case m_acknowledge:
  -                                             server.acknowledge(connectionToken, 
(AcknowledgementRequest)in.readObject());
  +            AcknowledgementRequest ack = new AcknowledgementRequest();
  +            ack.readExternal(in);
  +                                             server.acknowledge(connectionToken, 
ack);
                                                break;
  -                                     case m_addMessage:                             
                 
  -                                             server.addMessage(connectionToken, 
(SpyMessage)in.readObject());
  +                                     case m_addMessage:
  +                                             server.addMessage(connectionToken, 
SpyMessage.readMessage(in));
                                                break;
  -                                     case m_browse:                                 
         
  +                                     case m_browse:
                                                result=server.browse(connectionToken, 
(Destination)in.readObject(), (String)in.readObject());
                                                break;
  -                                     case m_checkID: 
  +                                     case m_checkID:
                                                
server.checkID((String)in.readObject());
                                                break;
  -                                     case m_connectionClosing: 
  +                                     case m_connectionClosing:
                                                
server.connectionClosing(connectionToken);
                                                closed = true;
                                                break;
  -                                     case m_createQueue: 
  +                                     case m_createQueue:
                                                
result=(Queue)server.createQueue(connectionToken, (String)in.readObject());
                                                break;
  -                                     case m_createTopic: 
  +                                     case m_createTopic:
                                                
result=(Topic)server.createTopic(connectionToken, (String)in.readObject());
                                                break;
  -                                     case m_deleteTemporaryDestination: 
  +                                     case m_deleteTemporaryDestination:
                                                
server.deleteTemporaryDestination(connectionToken, (SpyDestination)in.readObject());
                                                break;
  -                                     case m_getID: 
  +                                     case m_getID:
                                                result=server.getID();
                                                break;
  -                                     case m_getTemporaryQueue: 
  +                                     case m_getTemporaryQueue:
                                                
result=(TemporaryQueue)server.getTemporaryQueue(connectionToken);
                                                break;
                                        case m_getTemporaryTopic:
                                                
result=(TemporaryTopic)server.getTemporaryTopic(connectionToken);
                                                break;
  -                                     case m_listenerChange: 
  -                                             
server.listenerChange(connectionToken,in.readInt(),in.readBoolean());
  -                                             break;
                                        case m_receive:
                                                
result=server.receive(connectionToken,in.readInt(), in.readLong());
                                                break;
  @@ -253,22 +255,27 @@
                                                server.subscribe(connectionToken, 
(Subscription)in.readObject());
                                                break;
                                        case m_transact:
  -                                             server.transact(connectionToken, 
(TransactionRequest)in.readObject());
  +            TransactionRequest trans = new TransactionRequest();
  +            trans.readExternal(in);
  +                                             server.transact(connectionToken, 
trans);
                                                break;
  -                                     case m_unsubscribe: 
  +                                     case m_unsubscribe:
                                                
server.unsubscribe(connectionToken,in.readInt());
  +                                             break;
  +                                     case m_destroySubscription:
  +                                             
server.destroySubscription((DurableSubcriptionID)in.readObject());
                                                break;
  -                                     case m_checkUser: 
  +                                     case m_checkUser:
                                                result = 
server.checkUser((String)in.readObject(), (String)in.readObject());
                                                break;
                                        default:
                                                throw new RemoteException("Bad method 
code !");
                                }
  -                             
  +
                                //Everthing was OK
  -                             
  +
                                try {
  -                                     if (result==null) 
  +                                     if (result==null)
                                                out.writeByte(0);
                                        else {
                                                out.writeByte(1);
  @@ -279,15 +286,15 @@
                                } catch (IOException e) {
                                        if( closed )
                                                break;
  -                                             
  +
                                        category.warn("Connection failure (2).", e);
  -                                     break;                                  
  +                                     break;
                                }
  -                             
  +
                        } catch (Exception e) {
                                if( closed )
                                        break;
  -                                     
  +
                                category.info("Client request resulted in a server 
exception: ", e);
   
                                try {
  @@ -298,10 +305,10 @@
                                } catch (IOException e2) {
                                        if( closed )
                                                break;
  -                                             
  +
                                        category.warn("Connection failure (3).", e);
  -                                     break;                                  
  -                             }                               
  +                                     break;
  +                             }
                        }
                }
   
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to