User: dmaplesden
Date: 01/08/21 17:07:18
Modified: src/main/org/jboss/mq SpyConnectionConsumer.java
Log:
Implement the loading of multiple messages into each session when under heavy load.
Revision Changes Path
1.5 +26 -9 jbossmq/src/main/org/jboss/mq/SpyConnectionConsumer.java
Index: SpyConnectionConsumer.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyConnectionConsumer.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- SpyConnectionConsumer.java 2001/08/17 03:04:01 1.4
+++ SpyConnectionConsumer.java 2001/08/22 00:07:18 1.5
@@ -19,7 +19,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
* @created August 16, 2001
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class SpyConnectionConsumer implements javax.jms.ConnectionConsumer,
SpyConsumer, Runnable {
@@ -60,6 +60,8 @@
this.destination = destination;
this.serverSessionPool = serverSessionPool;
this.maxMessages = maxMessages;
+ if(this.maxMessages < 1)
+ this.maxMessages = 1;
subscription.destination = ( SpyDestination )destination;
subscription.messageSelector = messageSelector;
@@ -142,13 +144,24 @@
//Used to facilitate delivery of messages to sessions from server session pool.
public void run() {
- SpyMessage mes = null;
+ java.util.ArrayList mesList = new java.util.ArrayList();
try {
outer :
while ( true ) {
- //get Message
- while ( mes == null ) {
- mes = connection.receive( subscription, 0 );
+ synchronized( queue ){
+ if(closed)
+ break outer;
+ }
+ //get Messages
+ for(int i=0;i<maxMessages;i++){
+ SpyMessage mes = connection.receive(subscription, -1); //receive no
wait
+ if(mes == null)
+ break;
+ else
+ mesList.add(mes);
+ }
+ if(mesList.isEmpty()){
+ SpyMessage mes = connection.receive( subscription, 0 );
if ( mes == null ) {
synchronized ( queue ) {
waitingForMessage = true;
@@ -166,7 +179,9 @@
waitingForMessage = false;
}
}
+ mesList.add(mes);
}
+
ServerSession serverSession = serverSessionPool.getServerSession();
SpySession spySession = ( SpySession )serverSession.getSession();
@@ -176,17 +191,19 @@
spySession.sessionConsumer.subscription = subscription;
}
- spySession.addMessage( mes );
+ for(int i=0;i<mesList.size();i++){
+ spySession.addMessage( (SpyMessage)mesList.get(i) );
+ }
cat.debug( "" + this + " Starting the ServerSession." );
serverSession.start();
- mes = null;
+ mesList.clear();
}
} catch ( JMSException e ) {
cat.warn( "Connection consumer closing due to error in listening thread.",
e );
try {
- if ( mes != null ) {
- connection.send( mes.getAcknowledgementRequest( false ) );
+ for(int i=0;i<mesList.size();i++){
+ connection.send(
((SpyMessage)mesList.get(i)).getAcknowledgementRequest( false ) );
}
close();
} catch ( Exception ignore ) {
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development