User: hiram
Date: 00/11/10 11:52:11
Modified: src/java/org/spydermq JMSServerQueue.java
Log:
P2P messages can now be distributed to multiple listeners
in a round robin fasion. This might come in handy to do
some load balancing.
Revision Changes Path
1.14 +100 -17 spyderMQ/src/java/org/spydermq/JMSServerQueue.java
Index: JMSServerQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/JMSServerQueue.java,v
retrieving revision 1.13
retrieving revision 1.14
diff -u -r1.13 -r1.14
--- JMSServerQueue.java 2000/11/04 19:24:47 1.13
+++ JMSServerQueue.java 2000/11/10 19:52:10 1.14
@@ -18,7 +18,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.13 $
+ * @version $Revision: 1.14 $
*/
public class JMSServerQueue
{
@@ -45,6 +45,11 @@
private LinkedList messagesWaitingForAck;
//Nb of listeners for this Queue
int listeners;
+
+ // Should we use the round robin aproach to pick the next reciver of a p2p
message?
+ private boolean useRoundRobinMessageDistribution = true;
+ // Keeps track of the last used connection so that we can do round robin
distribution of p2p messages.
+ private SpyDistributedConnection lastUsedConnection;
// Constructor ---------------------------------------------------
@@ -242,7 +247,78 @@
//remove this connection from the list
removeSubscriber(dc,i);
}
-
+
+ /**
+ * Get a SpyDistributedConnection object that is listening
+ * to this queue. If multiple objects are listening to the queue
+ * this multiple calls to this method will cycle through them in a round
+ * robin fasion.
+ */
+ private SpyDistributedConnection pickNextRoundRobinConnection() {
+
+ // No valid next connection will exist, return null
+ if (listeners==0) return null;
+
+ Iterator i=subscribers.values().iterator();
+ SpyDistributedConnection firstFoundConnection=null;
+ boolean enableSelectNext = false;
+
+ while (i.hasNext()) {
+ SpyDistributedConnection t =(SpyDistributedConnection)i.next();
+
+ // Select the next valid connection if we are past the last used
connection
+ if( t == lastUsedConnection || lastUsedConnection == null )
+ enableSelectNext = true;
+
+ // Test to see if the connection is valid pick
+ if (t.listeners) {
+ // Store the first valid connection since the last used might be
the last
+ // in the list
+ if( firstFoundConnection == null )
+ firstFoundConnection = t;
+
+ // Are we past the last used? then we have the next item in the
round robin
+ if( enableSelectNext && t!=lastUsedConnection ) {
+ lastUsedConnection = t;
+ return t;
+ }
+ }
+ }
+ // We got here because we did not find a valid item in the list after
the last
+ // used item, so lest use the first valid item
+ if( firstFoundConnection != null ) {
+ lastUsedConnection = firstFoundConnection;
+ return firstFoundConnection;
+ } else {
+ Log.error("FIXME: The listeners count was invalid !");
+ return null;
+ }
+ }
+
+ /**
+ * Get a SpyDistributedConnection object that is listening
+ * to this queue. Picks the first one it can find.
+ */
+ private SpyDistributedConnection pickFirstFoundConnection() {
+
+ // No valid next connection will exist, return null
+ if (listeners==0) return null;
+
+ Iterator i=subscribers.values().iterator();
+ while (i.hasNext()) {
+ SpyDistributedConnection t =(SpyDistributedConnection)i.next();
+
+ // Test to see if the connection is valid pick
+ if (t.listeners) {
+ return t;
+ }
+ }
+
+ // We got here because we did not find a valid item in the list.
+ Log.error("FIXME: The listeners count was invalid !");
+ return null;
+ }
+
void doMyJob() throws JMSException
{
if (isTopic) {
@@ -277,26 +353,33 @@
//At first, find a receiver
//NL: We could find a better receiver (load
balancing ?)
+ //HC: Using Round Robin should provide some
load balancing
Log.log("get a receiver");
-
- if (listeners==0) break;
- Iterator i=subscribers.values().iterator();
- SpyDistributedConnection dc=null;
- while (i.hasNext()) {
- dc=(SpyDistributedConnection)i.next();
- if (dc.listeners) break;
- }
- if (dc==null||!dc.listeners) {
- Log.error("FIXME: The listeners count
was invalid !");
- break;
- }
-
+ // we may have to restore the
lastUsedConnection
+ // if message on the queue is not sent. (we
don't want to skip
+ // destination in the round robin)
+ SpyDistributedConnection
saveLastConnection=lastUsedConnection;
+ SpyDistributedConnection dc;
+
+ if( useRoundRobinMessageDistribution ) {
+ dc=pickNextRoundRobinConnection();
+ } else {
+ dc=pickFirstFoundConnection();
+ }
+ if ( dc == null ) break;
+
//Get the message ( if there is one message
pending )
SpyMessage mes=startWorkQueue();
- if (mes==null) break;
- if (mes.isOutdated()) continue;
+ if (mes==null) {
+ lastUsedConnection=saveLastConnection;
+ break;
+ }
+ if (mes.isOutdated()) {
+ lastUsedConnection=saveLastConnection;
+ continue;
+ }
//Send the message
try {