User: sparre  
  Date: 01/10/05 12:34:14

  Modified:    src/main/org/jboss/mq/server ClientConsumer.java
  Log:
  Created a thread pool with an embedded work queue for JBossMQ.
  And made the message pushers use it.
  
  Revision  Changes    Path
  1.7       +68 -49    jbossmq/src/main/org/jboss/mq/server/ClientConsumer.java
  
  Index: ClientConsumer.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/ClientConsumer.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- ClientConsumer.java       2001/09/20 03:52:57     1.6
  +++ ClientConsumer.java       2001/10/05 19:34:14     1.7
  @@ -5,6 +5,7 @@
    * See terms of license at gnu.org.
    */
   package org.jboss.mq.server;
  +
   import java.util.HashMap;
   import java.util.Hashtable;
   
  @@ -19,15 +20,20 @@
   
   import org.jboss.mq.xml.XElement;
   
  +import org.jboss.mq.threadpool.ThreadPool;
  +import org.jboss.mq.threadpool.Work;
  +
   /**
    *  This represent the clients queue which consumes messages from the
    *  destinations on the provider.
    *
    * @author     Hiram Chirino ([EMAIL PROTECTED])
    * @created    August 16, 2001
  - * @version    $Revision: 1.6 $
  + * @version    $Revision: 1.7 $
    */
  -public class ClientConsumer implements Runnable {
  +public class ClientConsumer
  +   implements Work
  +{
      //The JMSServer object
      JMSServer        server;
      //The connection this queue will send messages over
  @@ -49,18 +55,32 @@
      private LinkedList messages = new LinkedList();
      //LinkedList of the the temporary destinations that this client created
   //   public LinkedList temporaryDestinations = new LinkedList();
  -   //The message push thread for this consumer connection
  -   private Thread   messagePushThread;
  +
  +   /**
  +    *  Flags that I am enqueued as work on my thread pool.
  +    */
  +   private boolean enqueued = false;
  +
  +   // Static ---------------------------------------------------
  +
  +   /**
  +    *  The {@link org.jboss.mq.threadpool.ThreadPool ThreadPool} that
  +    *  does the actual message pushing for us.
  +    */
  +   private static ThreadPool threadPool = null;
   
      // Constructor ---------------------------------------------------
  +
      public ClientConsumer( JMSServer server, ConnectionToken dc )
         throws JMSException {
         this.server = server;
         this.dc = dc;
         cat = org.apache.log4j.Category.getInstance( ClientConsumer.class.getName() + 
":" + dc.getClientID() );
  -      messagePushThread = new Thread( server.threadGroup, this, "Message Pusher " + 
dc.getClientID() );
  -      messagePushThread.setDaemon( true );
  -      messagePushThread.start();
  +      // Create thread pool
  +      synchronized (ClientConsumer.class) {
  +         if (threadPool == null)
  +            threadPool = new ThreadPool("Message Pushers", server.threadGroup, 10, 
true);
  +      }
      }
   
      public void setEnabled( boolean enabled )
  @@ -82,10 +102,17 @@
         }
      }
   
  -   public void queueMessageForSending( ReceiveRequest r ) {
  -      synchronized ( messages ) {
  -         messages.add( r );
  -         messages.notify();
  +   public void queueMessageForSending(ReceiveRequest r)
  +   {
  +      synchronized (messages) {
  +         if (closed)
  +            return; // Wouldn't be delivered anyway
  +
  +         messages.add(r);
  +         if (!enqueued) {
  +            threadPool.enqueueWork(this);
  +            enqueued = true;
  +         }
         }
      }
   
  @@ -110,12 +137,15 @@
      }
   
      public void close() {
  -
         cat.debug( "" + this + "->close()" );
   
  -      synchronized ( messages ) {
  +      synchronized (messages) {
            closed = true;
  -         messages.notifyAll();
  +         if (enqueued) {
  +            cat.debug("" + this + "->close(): Cancelling work in progress.");
  +            threadPool.cancelWork(this);
  +            enqueued = false;
  +         }
         }
   
         synchronized ( subscriptions ) {
  @@ -181,45 +211,34 @@
         queue.removeSubscriber( req );
   
      }
  -
   
  -   // Iterate over the consumers asking them to take messages until they stop
  -   // consuming.
  -   public void run() {
  -
  -      cat.debug( "" + this + "->run()" );
  -
  -      while ( true ) {
  -
  -         ReceiveRequest[] job;
  -         synchronized ( messages ) {
  -            while ( messages.size() == 0 ) {
  -               try {
  -                  messages.wait();
  -               } catch ( InterruptedException e ) {
  -               }
  -               if ( closed ) {
  -                  return;
  -               }
  -            }
  -
  -            job = new ReceiveRequest[messages.size()];
  -            job = ( ReceiveRequest[] )messages.toArray( job );
  -            messages.clear();
  -         }
  -
  +   /**
  +    *  Push some messages.
  +    */
  +   public void doWork()
  +   {
  +      ReceiveRequest[] job;
  +
  +      synchronized (messages) {
  +         if (closed)
  +            return;
  +
  +         job = new ReceiveRequest[messages.size()];
  +         job = (ReceiveRequest[])messages.toArray(job);
  +         messages.clear();
  +         enqueued = false;
  +      }
  +
  +      try {
  +         dc.clientIL.receive(job);
  +      } catch (Exception e) {
  +         cat.warn("Could not send messages to a receiver.", e);
            try {
  -            dc.clientIL.receive( job );
  -         } catch ( Exception e ) {
  -            cat.warn( "Could not send messages to a receiver.", e );
  -            try {
  -               server.connectionFailure( dc );
  -            } catch ( Throwable ignore ) {
  -               cat.warn( "Could not close the client connection..", ignore );
  -            }
  +            server.connectionFailure(dc);
  +         } catch (Throwable ignore) {
  +            cat.warn( "Could not close the client connection..", ignore);
            }
         }
  -
      }
   
      public String toString() {
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to