User: dmaplesden
  Date: 01/08/21 17:07:18

  Modified:    src/main/org/jboss/mq SpyConnectionConsumer.java
  Log:
  Implement the loading of multiple messages into each session when under heavy load.
  
  Revision  Changes    Path
  1.5       +26 -9     jbossmq/src/main/org/jboss/mq/SpyConnectionConsumer.java
  
  Index: SpyConnectionConsumer.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyConnectionConsumer.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- SpyConnectionConsumer.java        2001/08/17 03:04:01     1.4
  +++ SpyConnectionConsumer.java        2001/08/22 00:07:18     1.5
  @@ -19,7 +19,7 @@
    *
    * @author     Hiram Chirino ([EMAIL PROTECTED])
    * @created    August 16, 2001
  - * @version    $Revision: 1.4 $
  + * @version    $Revision: 1.5 $
    */
   public class SpyConnectionConsumer implements javax.jms.ConnectionConsumer, 
SpyConsumer, Runnable {
   
  @@ -60,6 +60,8 @@
         this.destination = destination;
         this.serverSessionPool = serverSessionPool;
         this.maxMessages = maxMessages;
  +      if(this.maxMessages < 1)
  +         this.maxMessages = 1;
   
         subscription.destination = ( SpyDestination )destination;
         subscription.messageSelector = messageSelector;
  @@ -142,13 +144,24 @@
   
      //Used to facilitate delivery of messages to sessions from server session pool.
      public void run() {
  -      SpyMessage mes = null;
  +      java.util.ArrayList mesList = new java.util.ArrayList();
         try {
            outer :
            while ( true ) {
  -            //get Message
  -            while ( mes == null ) {
  -               mes = connection.receive( subscription, 0 );
  +            synchronized( queue ){
  +               if(closed)
  +                  break outer;
  +            }
  +            //get Messages
  +            for(int i=0;i<maxMessages;i++){
  +               SpyMessage mes = connection.receive(subscription, -1); //receive no 
wait
  +               if(mes == null)
  +                  break;
  +               else
  +                  mesList.add(mes);
  +            }
  +            if(mesList.isEmpty()){
  +               SpyMessage mes = connection.receive( subscription, 0 );
                  if ( mes == null ) {
                     synchronized ( queue ) {
                        waitingForMessage = true;
  @@ -166,7 +179,9 @@
                        waitingForMessage = false;
                     }
                  }
  +               mesList.add(mes);
               }
  +
               ServerSession serverSession = serverSessionPool.getServerSession();
               SpySession spySession = ( SpySession )serverSession.getSession();
   
  @@ -176,17 +191,19 @@
                  spySession.sessionConsumer.subscription = subscription;
               }
   
  -            spySession.addMessage( mes );
  +            for(int i=0;i<mesList.size();i++){
  +               spySession.addMessage( (SpyMessage)mesList.get(i) );
  +            }
   
               cat.debug( "" + this + " Starting the ServerSession." );
               serverSession.start();
  -            mes = null;
  +            mesList.clear();
            }
         } catch ( JMSException e ) {
            cat.warn( "Connection consumer closing due to error in listening thread.", 
e );
            try {
  -            if ( mes != null ) {
  -               connection.send( mes.getAcknowledgementRequest( false ) );
  +            for(int i=0;i<mesList.size();i++){
  +               connection.send( 
((SpyMessage)mesList.get(i)).getAcknowledgementRequest( false ) );
               }
               close();
            } catch ( Exception ignore ) {
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to