User: hiram   
  Date: 00/11/10 11:52:11

  Modified:    src/java/org/spydermq JMSServerQueue.java
  Log:
  P2P messages can now be distributed to multiple listeners
  in a round robin fasion.  This might come in handy to do
  some load balancing.
  
  Revision  Changes    Path
  1.14      +100 -17   spyderMQ/src/java/org/spydermq/JMSServerQueue.java
  
  Index: JMSServerQueue.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/JMSServerQueue.java,v
  retrieving revision 1.13
  retrieving revision 1.14
  diff -u -r1.13 -r1.14
  --- JMSServerQueue.java       2000/11/04 19:24:47     1.13
  +++ JMSServerQueue.java       2000/11/10 19:52:10     1.14
  @@ -18,7 +18,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.13 $
  + *   @version $Revision: 1.14 $
    */
   public class JMSServerQueue
   {
  @@ -45,6 +45,11 @@
        private LinkedList messagesWaitingForAck;
        //Nb of listeners for this Queue
        int listeners;
  +
  +        // Should we use the round robin aproach to pick the next reciver of a p2p 
message?
  +        private boolean useRoundRobinMessageDistribution = true;
  +        // Keeps track of the last used connection so that we can do round robin 
distribution of p2p messages.
  +        private SpyDistributedConnection lastUsedConnection;
        
        // Constructor ---------------------------------------------------
           
  @@ -242,7 +247,78 @@
                //remove this connection from the list
                removeSubscriber(dc,i);
        }
  -     
  +        
  +        /**
  +         * Get a SpyDistributedConnection object that is listening 
  +         * to this queue.  If multiple objects are listening to the queue
  +         * this multiple calls to this method will cycle through them in a round 
  +         * robin fasion.
  +         */
  +        private SpyDistributedConnection pickNextRoundRobinConnection() {           
 
  +             
  +            // No valid next connection will exist, return null
  +            if (listeners==0) return null;
  +            
  +            Iterator i=subscribers.values().iterator();
  +            SpyDistributedConnection firstFoundConnection=null;            
  +            boolean enableSelectNext = false;
  +
  +            while (i.hasNext()) {
  +                SpyDistributedConnection  t =(SpyDistributedConnection)i.next();    
                                 
  +
  +             // Select the next valid connection if we are past the last used 
connection
  +             if( t == lastUsedConnection || lastUsedConnection == null )
  +                     enableSelectNext = true;
  +
  +             // Test to see if the connection is valid pick
  +                if (t.listeners) {
  +                 // Store the first valid connection since the last used might be 
the last
  +                    // in the list 
  +                    if( firstFoundConnection == null )
  +                        firstFoundConnection = t;
  +
  +                    // Are we past the last used? then we have the next item in the 
round robin  
  +                    if( enableSelectNext && t!=lastUsedConnection ) {
  +                        lastUsedConnection = t;
  +                        return t;
  +                    }
  +                }
  +            }
  +            // We got here because we did not find a valid item in the list after 
the last
  +            // used item, so lest use the first valid item 
  +            if( firstFoundConnection != null ) {
  +                lastUsedConnection = firstFoundConnection;
  +                return firstFoundConnection;
  +            } else {
  +                Log.error("FIXME: The listeners count was invalid !");
  +                return null;
  +            }
  +        }
  +
  +        /**
  +         * Get a SpyDistributedConnection object that is listening 
  +         * to this queue.  Picks the first one it can find.
  +         */
  +        private SpyDistributedConnection pickFirstFoundConnection() {            
  +             
  +            // No valid next connection will exist, return null
  +            if (listeners==0) return null;
  +            
  +            Iterator i=subscribers.values().iterator();
  +            while (i.hasNext()) {
  +                SpyDistributedConnection  t =(SpyDistributedConnection)i.next();    
                                 
  +
  +             // Test to see if the connection is valid pick
  +                if (t.listeners) {
  +                    return t;
  +                }
  +            }
  +            
  +            // We got here because we did not find a valid item in the list.
  +            Log.error("FIXME: The listeners count was invalid !");
  +            return null;
  +        }
  +        
        void doMyJob() throws JMSException 
        {                       
                if (isTopic) {                  
  @@ -277,26 +353,33 @@
                                                                        
                                        //At first, find a receiver
                                        //NL: We could find a better receiver (load 
balancing ?)
  +                                     //HC: Using Round Robin should provide some 
load balancing
                                        
                                        Log.log("get a receiver");
  -                                     
  -                                     if (listeners==0) break;
  -                                     Iterator i=subscribers.values().iterator();
  -                                     SpyDistributedConnection dc=null;
  -                                     while (i.hasNext()) {
  -                                             dc=(SpyDistributedConnection)i.next(); 
                                 
  -                                             if (dc.listeners) break;
  -                                     }
                                        
  -                                     if (dc==null||!dc.listeners) {
  -                                             Log.error("FIXME: The listeners count 
was invalid !");
  -                                             break;
  -                                     }
  -
  +                                        // we may have to restore the 
lastUsedConnection
  +                                        // if message on the queue is not sent. (we 
don't want to skip 
  +                                        // destination in the round robin)
  +                                     SpyDistributedConnection 
saveLastConnection=lastUsedConnection;
  +                                     SpyDistributedConnection dc;
  +                                        
  +                                        if( useRoundRobinMessageDistribution ) {
  +                                            dc=pickNextRoundRobinConnection();
  +                                        } else {
  +                                            dc=pickFirstFoundConnection();
  +                                        }
  +                                        if ( dc == null ) break;
  +                                        
                                        //Get the message ( if there is one message 
pending )
                                        SpyMessage mes=startWorkQueue();
  -                                     if (mes==null) break;
  -                                     if (mes.isOutdated()) continue;
  +                                     if (mes==null) {
  +                                             lastUsedConnection=saveLastConnection;
  +                                             break;
  +                                     }
  +                                     if (mes.isOutdated()) {
  +                                             lastUsedConnection=saveLastConnection;
  +                                             continue;
  +                                     }
                                                                        
                                        //Send the message
                                        try {
  
  
  

Reply via email to