User: norbert 
  Date: 00/05/30 15:10:19

  Modified:    src/java/org/spyderMQ ConnectionQueue.java JMSServer.java
                        JMSServerQueue.java SessionQueue.java
                        SpyConnection.java SpyDistributedConnection.java
                        SpyQueueSession.java SpySession.java
  Log:
  The P2P system
  
  Revision  Changes    Path
  1.4       +15 -9     spyderMQ/src/java/org/spyderMQ/ConnectionQueue.java
  
  Index: ConnectionQueue.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/ConnectionQueue.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- ConnectionQueue.java      2000/05/25 01:52:05     1.3
  +++ ConnectionQueue.java      2000/05/30 22:10:17     1.4
  @@ -16,7 +16,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.3 $
  + *   @version $Revision: 1.4 $
    */
   public class ConnectionQueue
   {
  @@ -27,7 +27,7 @@
        //the SpySessions linked to this queue
        public HashSet subscribers;
        //Number of listening sessions 
  -     int NumListeningSessions;
  +     public int NumListeningSessions;
        //My SpyConnection
        SpyConnection connection;
   
  @@ -58,17 +58,23 @@
                return subscribers.size()==0;
        }
   
  -     synchronized void changeNumListening(int val)
  +     synchronized void changeNumListening(int val) throws JMSException
        {
                NumListeningSessions+=val;
   
  -             Log.log("ConnectionQueue: 
changeNumListening("+NumListeningSessions+")");
  +             Log.log("ConnectionQueue: 
changeNumListening(sessions="+NumListeningSessions+")");
                
  -             /*if (val==-1&&NumListeningSubscribers==0) {
  -                     
((SpyQueueConnection)session.connection).changeNumListening(val);
  -             } else if (val==1&&NumListeningSubscribers==1) {
  -                     
((SpyQueueConnection)session.connection).changeNumListening(val);
  -             }*/
  +             try {           
  +                     
  +                     if (val==-1&&NumListeningSessions==0) {
  +                             
connection.provider.connectionListening(false,destination,connection.distributedConnection);
  +                     } else if (val==1&&NumListeningSessions==1) {
  +                             
connection.provider.connectionListening(true,destination,connection.distributedConnection);
  +                     }
  +                     
  +             } catch (Exception e) {
  +                     connection.failureHandler(e,"Cannot contact the JMS server");
  +             }
        
        }
   }
  
  
  
  1.33      +11 -2     spyderMQ/src/java/org/spyderMQ/JMSServer.java
  
  Index: JMSServer.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/JMSServer.java,v
  retrieving revision 1.32
  retrieving revision 1.33
  diff -u -r1.32 -r1.33
  --- JMSServer.java    2000/05/25 01:18:33     1.32
  +++ JMSServer.java    2000/05/30 22:10:17     1.33
  @@ -22,7 +22,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.32 $
  + *   @version $Revision: 1.33 $
    */
   public class JMSServer 
                implements Runnable 
  @@ -38,7 +38,7 @@
        //messages pending for a Destination ( HashMap of JMSServerQueue objects )
        private HashMap messageQueue;
        //list of tasks pending ( linked list of JMSServerQueue objects )
  -     LinkedList taskQueue;
  +     LinkedList taskQueue; //look when we unregister a temporaryTopic/Queue
        //last id given to a client 
        private int lastID;     
        //last id given to a temporary topic
  @@ -312,4 +312,13 @@
                
                return serverQueue.queueReceiveNoWait();
        }
  +     
  +     public void connectionListening(boolean mode,Destination 
dest,SpyDistributedConnection dc) throws JMSException
  +     {
  +             JMSServerQueue serverQueue=(JMSServerQueue)messageQueue.get(dest);
  +             if (serverQueue==null) throw new JMSException("This destination does 
not exist !");
  +             
  +             serverQueue.connectionListening(mode,dc);
  +     }
  +
   }
  
  
  
  1.28      +92 -14    spyderMQ/src/java/org/spyderMQ/JMSServerQueue.java
  
  Index: JMSServerQueue.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/JMSServerQueue.java,v
  retrieving revision 1.27
  retrieving revision 1.28
  diff -u -r1.27 -r1.28
  --- JMSServerQueue.java       2000/05/19 19:28:49     1.27
  +++ JMSServerQueue.java       2000/05/30 22:10:17     1.28
  @@ -11,14 +11,14 @@
   import java.util.Iterator;
   import java.util.Hashtable;
   import java.util.LinkedList;
  -import java.util.HashSet;
  +import java.util.HashMap;
   
   /**
    *   This class is a message queue which is stored (hashed by Destination) on the 
JMS provider
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.27 $
  + *   @version $Revision: 1.28 $
    */
   public class JMSServerQueue
   {
  @@ -27,7 +27,7 @@
        //the Destination of this queue
        Destination destination;
        //DistributedConnection objs that have "registered" to this Destination
  -     HashSet subscribers;
  +     private HashMap subscribers;
        //List of Pending messages
        private LinkedList messages;
        //Is a thread already working on this queue ? 
  @@ -43,13 +43,15 @@
        boolean isTopic;
        //List of messages waiting for acknowledgment
        private LinkedList messagesWaitingForAck;
  +     //Nb of listeners for this Queue
  +     int listeners;
        
        // Constructor ---------------------------------------------------
           
        JMSServerQueue(Destination dest,SpyDistributedConnection temporary,JMSServer 
server)
        {
                destination=dest;
  -             subscribers=new HashSet();
  +             subscribers=new HashMap();
                messages=new LinkedList();
                threadWorking=false;
                alreadyInTaskQueue=false;
  @@ -57,6 +59,7 @@
                this.server=server;
                messagesWaitingForAck=new LinkedList();
                isTopic=dest instanceof SpyTopic;
  +             listeners=0;
        }
        
        // Package protected ---------------------------------------------
  @@ -66,7 +69,7 @@
                //We want to avoid removeSubscriber, addSubscriber or sendOneMessage 
to work concurently
                synchronized (destination) {
                        if 
(temporaryDestination!=null&&!temporaryDestination.equals(dc)) throw new 
JMSException("You cannot subscriber to this temporary destination");
  -                     subscribers.add(dc);
  +                     subscribers.put(dc.getClientID(),dc);
                }
        }
   
  @@ -74,7 +77,13 @@
        {
                //We want to avoid removeSubscriber, addSubscriber or sendOneMessage 
to work concurently
                synchronized (destination) {
  +                     
  +                     SpyDistributedConnection 
distributedConnection=(SpyDistributedConnection)subscribers.get(dc.getClientID());
  +                     if (distributedConnection==null) return;
  +                     listeners-=distributedConnection.listeners;
  +                     
                        subscribers.remove(dc);
  +                                                                     
                }
        }
   
  @@ -103,9 +112,9 @@
                        
                        if (isTopic) {
                                //if a thread is already working on this destination, 
I don't have to myself to the taskqueue
  -                             if (!threadWorking) notifyWorkers();                   
                                 
  +                             if (!threadWorking) notifyWorkers();
                        } else {
  -                             Log.log("Queue: addMessage");
  +                             if (listeners!=0&&!threadWorking) notifyWorkers();
                        }
   
                }
  @@ -126,6 +135,18 @@
                }
        }
        
  +     synchronized SpyMessage startWorkQueue()
  +     {
  +             synchronized (messages) {
  +                     
  +                     threadWorking=true;
  +                     alreadyInTaskQueue=false;
  +
  +                     if (messages.size()==0) return null;
  +                     return (SpyMessage)messages.removeFirst();
  +             }
  +     }
  +     
        void endWork()
        {
                //The thread has finished his work...
  @@ -136,7 +157,7 @@
                                //notify another thread if there is work to do !
                                if (!messages.isEmpty()) notifyWorkers();
                        } else {
  -                             Log.log("Queue: endWork");
  +                             if (listeners!=0&&!messages.isEmpty()) notifyWorkers();
                        }
                }
        }
  @@ -147,7 +168,7 @@
                synchronized (subscribers) {
                        if (subscribers.isEmpty()) return;
                                
  -                     Iterator i=subscribers.iterator();
  +                     Iterator i=subscribers.values().iterator();
                                
                        while (i.hasNext()) {
                                SpyDistributedConnection 
dc=(SpyDistributedConnection)i.next();
  @@ -167,7 +188,7 @@
                synchronized (subscribers) {
                        if (subscribers.isEmpty()) return;
                                
  -                     Iterator i=subscribers.iterator();
  +                     Iterator i=subscribers.values().iterator();
                                
                        while (i.hasNext()) {
                                SpyDistributedConnection 
dc=(SpyDistributedConnection)i.next();
  @@ -185,7 +206,7 @@
        //A connection is closing
        void connectionClosing(SpyDistributedConnection dc)
        {
  -             if (!subscribers.contains(dc)) return;
  +             if (!subscribers.containsKey(dc.getClientID())) return;
                Log.log("Warning: The DistributedConnection was still registered for 
"+destination);
                removeSubscriber(dc);
        }
  @@ -211,7 +232,8 @@
                server.connectionClosing(dc,this);
                
                //remove this connection from the list
  -             i.remove();
  +             if (i!=null) i.remove();
  +             else subscribers.remove(dc.getClientID());
        }
        
        void doMyJob()
  @@ -219,7 +241,7 @@
                if (isTopic) {                  
                        
                        //Clear the message queue
  -                     SpyMessage[] msgs=startWork();                  
  +                     SpyMessage[] msgs=startWork();
                                
                        //Let the thread do its work
                        if (msgs.length>1) {
  @@ -240,8 +262,41 @@
                        
                } else {
                        
  -                     Log.log("Queue :)");
  +                     while (true) {
  +                             
  +                             //Get a receiver - NL We could find a better receiver 
(load balancing ?)                                
  +                             if (listeners==0) break;                               
 
  +                             Iterator i=subscribers.values().iterator();
  +                             SpyDistributedConnection dc=null;
  +                             while (i.hasNext()) {
  +                                     dc=(SpyDistributedConnection)i.next();         
                         
  +                                     if (dc.listeners!=0) break;
  +                             }
  +                             if (dc==null||dc.listeners==0) {
  +                                     listeners=0;
  +                                     Log.log("WARNING: The listeners count was 
invalid !");
  +                                     break;
  +                             }
  +                             
  +                             //Get the message
  +                             SpyMessage mes=startWorkQueue();
  +                             if (mes==null) break;
  +                             if (mes.isOutdated()) continue;
  +                             
  +                             //Send the message
  +                             try {
  +                                     dc.cr.receive(destination,mes);
  +                             } catch (Exception e) {
  +                                     Log.error("Cannot deliver this message to the 
client "+dc);                                     
  +                                     Log.error(e);
  +                                     handleConnectionFailure(dc,null);
  +                             } 
  +                             
  +                     }
                        
  +                     //Notify that it has finished its work : another thread can 
start working on this queue
  +                     endWork();
  +                                             
                }
        }
        
  @@ -253,4 +308,27 @@
                }
        }
                
  +     void connectionListening(boolean mode,SpyDistributedConnection dc) throws 
JMSException 
  +     {
  +             SpyDistributedConnection 
distributedConnection=(SpyDistributedConnection)subscribers.get(dc.getClientID());
  +             if (distributedConnection==null) throw new JMSException("This 
DistributedConnection is not registered");
  +             
  +             if (mode) {
  +                     distributedConnection.listeners++;
  +                     listeners++;
  +
  +                     if (listeners==1&&!threadWorking)
  +                             synchronized (messages) {
  +                                     if (!messages.isEmpty()) notifyWorkers();
  +                             }
  +             
  +             } else {
  +                     distributedConnection.listeners--;
  +                     listeners--;
  +             }
  +
  +             Log.log("Listeners for "+destination+" = "+listeners);
  +             
  +     }
  +     
   }
  
  
  
  1.18      +44 -3     spyderMQ/src/java/org/spyderMQ/SessionQueue.java
  
  Index: SessionQueue.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SessionQueue.java,v
  retrieving revision 1.17
  retrieving revision 1.18
  diff -u -r1.17 -r1.18
  --- SessionQueue.java 2000/05/25 01:52:05     1.17
  +++ SessionQueue.java 2000/05/30 22:10:17     1.18
  @@ -21,7 +21,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.17 $
  + *   @version $Revision: 1.18 $
    */
   public class SessionQueue
   {
  @@ -32,9 +32,9 @@
        //List of messages waiting for acknoledgment
        LinkedList messagesWaitingForAck;
        //the MessageConsumers linked to this queue
  -     HashSet subscribers;
  +     public HashSet subscribers;
        //Number of listening receivers 
  -     int NumListeningSubscribers;
  +     public int NumListeningSubscribers;
        //My SpySession
        SpySession session;
   
  @@ -175,5 +175,46 @@
                }
        
        }
  +     
  +     
  +     public void dispatchMessage(Destination dest, SpyMessage mes) throws 
JMSException
  +     {       
  +             Log.log("SessionQueue: 
dispatchMessage(Destination="+dest.toString()+",Mes="+mes.toString()+")");
  +             
  +             if (session.closed) throw new IllegalStateException("The session is 
closed");           
  +             if (NumListeningSubscribers==0) throw new JMSException("There is no 
receiver for this queue !"); //We should catch this error in the JMSServerQueue object
  +             if (mes.isOutdated()) return;
  +             
  +             Iterator i=subscribers.iterator();              
  +             SpyQueueReceiver receiver=null;
  +             while (i.hasNext()) {
  +                     receiver=(SpyQueueReceiver)i.next();
  +                     if (receiver.listening) break;
  +             }
  +             if (receiver==null||!receiver.listening) {
  +                     NumListeningSubscribers=0;
  +                     Log.log("WARNING: The listeners count was invalid !");
  +                     throw new JMSException("There is no receiver for this queue 
!"); //We should catch this error in the JMSServerQueue object
  +             }
  +
  +             
  +             synchronized (receiver.messages) {
  +                             
  +                     if (receiver.messageListener==null) {
  +                             if (!receiver.waitInReceive) throw new 
JMSException("There is no receiver for this queue !"); //We should catch this error in 
the JMSServerQueue object
  +                             receiver.addMessage(mes);
  +                             receiver.messages.notify();
  +                     } else {
  +                             receiver.addMessage(mes);                              
 
  +                             receiver.messageListener.onMessage(mes);
  +                     }
  +                                                                                    
                 
  +             }
  +
  +             
  +     }                                                                       
  +             
  +
  +
        
   }
  
  
  
  1.32      +2 -2      spyderMQ/src/java/org/spyderMQ/SpyConnection.java
  
  Index: SpyConnection.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyConnection.java,v
  retrieving revision 1.31
  retrieving revision 1.32
  diff -u -r1.31 -r1.32
  --- SpyConnection.java        2000/05/26 22:37:49     1.31
  +++ SpyConnection.java        2000/05/30 22:10:18     1.32
  @@ -29,7 +29,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.31 $
  + *   @version $Revision: 1.32 $
    */
   public class SpyConnection 
                implements Connection, Serializable
  @@ -373,7 +373,7 @@
                }               
        }
   
  -     protected void failureHandler(Exception e,String reason) throws JMSException
  +     public void failureHandler(Exception e,String reason) throws JMSException
        {
                Log.error(e);
                
  
  
  
  1.5       +2 -1      spyderMQ/src/java/org/spyderMQ/SpyDistributedConnection.java
  
  Index: SpyDistributedConnection.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyDistributedConnection.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- SpyDistributedConnection.java     2000/05/18 02:10:04     1.4
  +++ SpyDistributedConnection.java     2000/05/30 22:10:18     1.5
  @@ -14,13 +14,14 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.4 $
  + *   @version $Revision: 1.5 $
    */
   public class SpyDistributedConnection 
        implements Serializable
   {
        private String clientID;
        public ConnectionReceiver cr;
  +     public transient int listeners;
        
        SpyDistributedConnection(String id,ConnectionReceiver cr_)
        {
  
  
  
  1.11      +3 -5      spyderMQ/src/java/org/spyderMQ/SpyQueueSession.java
  
  Index: SpyQueueSession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyQueueSession.java,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -r1.10 -r1.11
  --- SpyQueueSession.java      2000/05/25 01:52:05     1.10
  +++ SpyQueueSession.java      2000/05/30 22:10:18     1.11
  @@ -23,7 +23,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.10 $
  + *   @version $Revision: 1.11 $
    */
   public class SpyQueueSession 
        extends SpySession 
  @@ -76,7 +76,7 @@
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
  -             //Not implemented yet
  +             //Not yet implemented
                return createReceiver(queue);
        }
   
  @@ -99,9 +99,7 @@
        //Called by the ConnectionReceiver object : put a new msg in the receiver's 
queue
        public void dispatchMessage(Destination dest,SpyMessage mes) throws 
JMSException
        {
  -             if (closed) throw new IllegalStateException("The session is closed");  
         
  -                                                                             
  -             //Not implemented yet
  +             //Done in the SessionQueue :)
        }
        
        
  
  
  
  1.21      +2 -2      spyderMQ/src/java/org/spyderMQ/SpySession.java
  
  Index: SpySession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpySession.java,v
  retrieving revision 1.20
  retrieving revision 1.21
  diff -u -r1.20 -r1.21
  --- SpySession.java   2000/05/26 22:37:49     1.20
  +++ SpySession.java   2000/05/30 22:10:18     1.21
  @@ -28,7 +28,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.20 $
  + *   @version $Revision: 1.21 $
    */
   public class SpySession 
        implements Runnable, Session
  @@ -45,7 +45,7 @@
        //The connection object to which this session is linked
        protected SpyConnection connection;
        //HashMap of SessionQueue by Destination
  -     HashMap destinations;
  +     public HashMap destinations;
        //The outgoing message queue 
        protected LinkedList outgoingQueue;
        //The outgoing message queue for messages that have been commited (if the 
session is transacted)
  
  
  

Reply via email to