User: norbert 
  Date: 00/06/14 21:10:01

  Added:       src/java/org/spydermq/distributed/server
                        ConnectionReceiverOIL.java
                        ConnectionReceiverOILClient.java
  Log:
  no message
  
  Revision  Changes    Path
  1.1                  
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java
  
  Index: ConnectionReceiverOIL.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.distributed.server;
  
  import javax.jms.Destination;
  import javax.jms.JMSException;
  import org.spydermq.SpyConnection;
  import org.spydermq.ConnectionQueue;
  import org.spydermq.SpyMessage;
  import org.spydermq.SpySession;
  import org.spydermq.SessionQueue;
  import org.spydermq.SpyDestination;
  import org.spydermq.SpyTopicConnection;
  import org.spydermq.SpyQueueSession;
  import org.spydermq.Log;
  import org.spydermq.NoReceiverException;
  import org.spydermq.distributed.interfaces.ConnectionReceiver;
  import java.util.Hashtable;
  import java.util.HashSet;
  import java.util.Iterator;
  import java.rmi.RemoteException; 
  import java.net.ServerSocket;
  import java.net.Socket;
  import java.io.ObjectOutputStream;
  import java.io.ObjectInputStream;
  import java.io.InputStream;
  import java.io.OutputStream;
  import java.io.IOException;
  
  /**
   *    The OIL implementation of the ConnectionReceiver object
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class ConnectionReceiverOIL implements ConnectionReceiver, Runnable
  {
        // Attributes ----------------------------------------------------
  
        //A link on my connection
        private SpyConnection connection;
        //Is my connection closed ?
        private boolean closed;
  
        // Constructor ---------------------------------------------------
           
        public ConnectionReceiverOIL() 
        {
                closed=false;
                exportObject();
        }
  
        // Internals -----------------------------------------------------
        // Should be hold in an "extention" and the ConnectionReceiver logic should be 
in a separated object
        // Why not generate a dynamic proxy for that :)
        
        static final int RECEIVE = 1;
        static final int RECEIVE_MULTIPLE = 2;
        static final int DELETE_TEMPORARY_DESTINATION = 3;
        static final int CLOSE = 4;             
        
        ServerSocket serverSocket;
        
        public void exportObject()      
        {
                try {
                        //Should be dynamic...
                         serverSocket = new ServerSocket(12345);
                         new Thread(this).start();
                } catch (IOException e) {
                        Log.error("Bug");
                }
        }
        
        public void run()
        {
                Socket socket = null;
                int code = 0;
                InputStream is=null;
                OutputStream os=null;
                ObjectOutputStream out=null;
                ObjectInputStream in=null;
                      
                try {
                        socket = serverSocket.accept();
                    
                        new Thread(this).start();
  
                        is=socket.getInputStream();
                        os=socket.getOutputStream();
                        out = new ObjectOutputStream(os);
                        in = new ObjectInputStream(is);
  
                } catch (IOException e) {
                        Log.error("Bug");
                }
  
                while (true) {
  
                        Log.log("Wait for command");
                
                        try {
                                code=in.read();         
                        } catch (IOException e) {
                                Log.error("Bug");
                        }
                
                        try {
                                
                                Log.log("code = "+code);
                                
                                switch (code)
                                {
                                        case RECEIVE: 
                                                
receive((SpyDestination)in.readObject(),(SpyMessage)in.readObject());
                                                break;
                                        case RECEIVE_MULTIPLE: 
                                                
receiveMultiple((SpyDestination)in.readObject(),(SpyMessage[])in.readObject());
                                                break;
                                        case DELETE_TEMPORARY_DESTINATION:
                                                
deleteTemporaryDestination((SpyDestination)in.readObject());
                                                break;
                                        case CLOSE: 
                                                 close();
                                                break;
                                        default:
                                                throw new RemoteException("Bad method 
code !");
                                }
                                
                                //Everthing was OK
                                Log.log("OK !");
                                
                                try {
                                        os.write(0);
                                } catch (IOException e) {
                                        Log.error("Bug");
                                }
                                
                        } catch (Exception e) {
                                Log.log("Throw exception");
                                Log.error(e);                   
  
                                try {
                                        os.write(1);
                                } catch (IOException e2) {
                                        Log.error("Bug");
                                }
                                
                        }
  
                }
        }
        
        // Public --------------------------------------------------------      
  
        public void setConnection(SpyConnection connection)
        {
                this.connection=connection;
        }
        
        //A message has arrived for this Connection, We have to dispatch it to the 
sessions
      public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
        {
                if (closed) throw new IllegalStateException("The connection is 
closed");
                
                Log.log("ConnectionReceiver: 
Receive(Destination="+dest.toString()+",Mes="+mes.toString()+")");
                
                if (connection instanceof SpyTopicConnection) {
                
                        //Get the set of subscribers for this Topic
                
                        ConnectionQueue 
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);             
                        if (connectionQueue==null) return;
        
                        Iterator i=connectionQueue.subscribers.iterator();
                                        
                        while (i.hasNext()) {                                          
                 
                                        
                                SpySession session=(SpySession)i.next();
                                        
                                //add the new message to the session's queue
                                session.dispatchMessage(dest,mes);
                                        
                                //There is work to do... 
                                session.mutex.notifyLock();
                        }
                        
                } else {
                        
                        while (true) {
                                
                                SessionQueue sq=null;
  
                                try {
                        
                                        //Find one session waiting for this Queue
                                        if (connection.modeStop) throw new 
Exception("This connection is stopped !");
                                        ConnectionQueue 
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);
                                        if (connectionQueue==null) throw new 
Exception("There is no connectionQueue for this destination !");
                                        
                                        synchronized (connectionQueue) {
                                                
                                                //Find a SessionQueue
                                                if 
(connectionQueue.NumListeningSessions==0) throw new Exception("There are no listening 
sessions for this destination !");
                        
                                                Iterator 
i=connectionQueue.subscribers.iterator();
                                                while (i.hasNext()) {
                                                        SpySession 
session=(SpySession)i.next();
                                                        
sq=(SessionQueue)session.destinations.get(dest);
                                                        if 
(sq.NumListeningSubscribers!=0) break;
                                                }
                                                if 
(sq==null||sq.NumListeningSubscribers==0) {
                                                        Log.error("FIXME: The 
listeners count was invalid !");
                                                        throw new Exception("There are 
no listening sessions for this destination !");
                                                }
                                        
                                                //Try with this sessionQueue
                                                sq.dispatchMessage(dest,mes);
                                
                                                //Our work is done here
                                                break;
                                        }
  
                                } catch (NoReceiverException e) {
                                        //This SessionQueue should not have been 
registered !
                                        continue;
                                } catch (Exception e) {
                                        //This error is non-recoverable : we must 
unregister from this queue
                                        //Let the JMSServerQueue do its work
                                        Log.log(e);
                                        throw new NoReceiverException("There are no 
listening sessions in this connection");
                                }
                        }
                        
                }
        } 
  
        public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws 
JMSException
        {
                for(int i=0;i<mes.length;i++) {
                        receive(dest,mes[i]);
                }
        }
        
      public void close() throws Exception
        {
                closed=true;            
        }
  
        //One TemporaryDestination has been deleted
        public void deleteTemporaryDestination(SpyDestination dest) throws JMSException
        {
                if (closed) throw new IllegalStateException("The connection is 
closed");        
  
                connection.deleteTemporaryDestination(dest);
        }
  
  }
  
  
  
  1.1                  
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java
  
  Index: ConnectionReceiverOILClient.java
  ===================================================================
  package org.spydermq.distributed.server;
  
  import org.spydermq.distributed.interfaces.ConnectionReceiver;
  import org.spydermq.SpyDestination;
  import org.spydermq.SpyMessage;
  import org.spydermq.Log;
  import org.spydermq.SpyConnection;
  import java.rmi.RemoteException;
  import java.io.ObjectOutputStream;
  import java.io.ObjectInputStream;
  import java.io.InputStream;
  import java.io.OutputStream;
  import java.io.IOException;
  import java.net.Socket;
  
  public class ConnectionReceiverOILClient
        implements ConnectionReceiver
  {
        static final int RECEIVE = 1;
        static final int RECEIVE_MULTIPLE = 2;
        static final int DELETE_TEMPORARY_DESTINATION = 3;
        static final int CLOSE = 4;             
        
        Socket socket;
        InputStream is;
        OutputStream os;
        ObjectOutputStream out;
        ObjectInputStream in;
        
        ConnectionReceiverOILClient() throws RemoteException 
        {
                try {
                        Socket socket=new Socket("localhost",12345);
                        is=socket.getInputStream();
                        os=socket.getOutputStream();
                        in=new ObjectInputStream(is);
                        out=new ObjectOutputStream(os);
                } catch (Exception e) {
                        Log.error(e);
                }
        }
        
        public void waitAnswer() throws Exception 
        {
                try {
                        int val=is.read();
                        if (val==0) Log.log("Return : OK");
                        else {
                                Log.log("Return : Exception");
                                throw new Exception("Remote Exception");
                        }
                } catch (IOException e) {
                        Log.error("IOException while reading the answer");
                        Log.error(e);
                        throw new RemoteException("Cannot contact the Distant object");
                }
        }
        
      public void receive(SpyDestination dest,SpyMessage mes) throws Exception
        {               
                os.write(RECEIVE);
                out.writeObject(dest);
                out.writeObject(mes);
                waitAnswer();
        }
                
      public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws 
Exception        
        {
                os.write(RECEIVE_MULTIPLE);
                out.writeObject(dest);
                out.writeObject(mes);
                waitAnswer();
        }       
        
      public void deleteTemporaryDestination(SpyDestination dest) throws Exception
        {
                os.write(DELETE_TEMPORARY_DESTINATION);
                out.writeObject(dest);
                waitAnswer();
        }       
      
        public void close() throws Exception
        {
                os.write(CLOSE);
                waitAnswer();
        }
        
        public void setConnection(SpyConnection connection) throws Exception
        {
                //SHOULD NOT be there !!!
        }
  
  }
  
  
  

Reply via email to