User: chirino 
  Date: 01/06/21 20:16:02

  Modified:    src/main/org/jbossmq SpyConnection.java
                        SpyMessageConsumer.java SpySession.java
  Log:
  The Spy sessions and SypConnections now use a single system wide Thread
  Pool for all async task that occur.  This should help reduce the number of
  threads that used when using multiple connections or if messages are being
  delivered with the help of a MDB.
  
  Revision  Changes    Path
  1.7       +45 -25    jbossmq/src/main/org/jbossmq/SpyConnection.java
  
  Index: SpyConnection.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyConnection.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- SpyConnection.java        2001/05/13 08:51:31     1.6
  +++ SpyConnection.java        2001/06/22 03:16:02     1.7
  @@ -32,13 +32,15 @@
   import org.jbossmq.distributed.interfaces.ConnectionReceiver;
   import org.jbossmq.distributed.interfaces.DistributedJMSServer;
   
  +import EDU.oswego.cs.dl.util.concurrent.*;
  +
   /**
    *   This class implements javax.jms.Connection
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.6 $
  + *   @version $Revision: 1.7 $
    */
   public class SpyConnection implements java.io.Serializable, javax.jms.Connection {
        //////////////////////////////////////////////////////////////
  @@ -90,19 +92,7 @@
                crClassName = crCN;
                spyXAResourceManager = new SpyXAResourceManager(this);
   
  -             // Stop Asynch based programs from exiting out due to no
  -             // non-deamon threads running.  We start one here that lives
  -             // untill the connection is closed().
  -             connectionThread = new Thread("SpyConnection Thread") {
  -                     synchronized public void run() {
  -                             try {
  -                                     this.wait(); // Wait for close() to stop me
  -                             } catch ( InterruptedException e ) {
  -                             }
  -                     }
  -             };
  -             connectionThread.start();
  -             
  +             openExecutor();         
        }
   
        //////////////////////////////////////////////////////////////
  @@ -231,14 +221,8 @@
                        failureHandler(e, "Cannot close properly the connection");
                }
                Log.log("Disconnected from server");
  -
   
  -             // Stop the non deamon thread that we created.
  -             synchronized (connectionThread) {
  -                     connectionThread.notify();
  -                     connectionThread.interrupt();
  -             }
  -             
  +             closeExecutor();
                // Only set the closed flag after all the objects that depend 
                // on this connection have been closed.
                closed = true;
  @@ -588,11 +572,47 @@
                        failureHandler(e, "Cannot acknowlege a message.");
                }
        }
  +
  +
   
  -     // The connectionThread is a non-deamon thread that
  -     // runs until the connection is closed().  This allows
  -     // Writing asynch client programs that do not create
  +     // The Executor holds a pool non-deamon threads that
  +     // service all the async tasks on the client.
  +     // We make this a class varibale to get better pooling.
  +     // 
  +     // We will also keep a count of how many instances of the SpyConnections
  +     // there are so we can destroy the Executor once it is not
  +     // needed.  This will also allow us to cope with
  +     // asynch client programs that do not create
        // long running threads.  The program would terminate when
        // the connection is closed()
  -     Thread connectionThread;
  +     static public Executor execeutor;
  +     static private int spyConnectionCounter=0;
  +
  +     synchronized private void closeExecutor() {
  +             spyConnectionCounter--;
  +             if( spyConnectionCounter == 0 ) {
  +                     
((PooledExecutor)execeutor).shutdownAfterProcessingCurrentlyQueuedTasks();
  +             }               
  +     }
  +
  +     synchronized private void openExecutor() {
  +         if (spyConnectionCounter == 0) {
  +                 
  +             // Using a bounded buffer of 10 tasks, at least 4 threads (started only
  +             // when needed due to incoming requests), but allowing
  +             // up to 100 threads if the buffer gets full.
  +             // allowing them to  die if they are not used for 1 minute.
  +             /// clients block if both the buffer is full and all 100 threads are 
busy:
  +             PooledExecutor pool = new PooledExecutor();
  +             pool = new PooledExecutor(new BoundedBuffer(10), 50);
  +             pool.setMinimumPoolSize(1);
  +             pool.setKeepAliveTime(1000 * 60 * 1);
  +             pool.createThreads(1);
  +             pool.waitWhenBlocked();
  +             
  +             execeutor = pool;
  +             
  +             spyConnectionCounter++;
  +         }
  +     }
   }
  
  
  
  1.5       +6 -3      jbossmq/src/main/org/jbossmq/SpyMessageConsumer.java
  
  Index: SpyMessageConsumer.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpyMessageConsumer.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- SpyMessageConsumer.java   2001/06/18 23:39:07     1.4
  +++ SpyMessageConsumer.java   2001/06/22 03:16:02     1.5
  @@ -26,7 +26,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.4 $
  + *   @version $Revision: 1.5 $
    */
   public class SpyMessageConsumer implements MessageConsumer, SpyConsumer {
   
  @@ -307,8 +307,11 @@
        }
   
        public void processMessages() throws JMSException {
  +             try {
                if( !closed )
  -                     session.mutex.notifyLock();
  +                     session.connection.execeutor.execute( session );
  +             } catch ( InterruptedException ignore ) {
  +             }
        }
   
        public Subscription getSubscription() {
  @@ -387,4 +390,4 @@
                        return message;
                }
        }
  -}
  +}
  \ No newline at end of file
  
  
  
  1.4       +3 -27     jbossmq/src/main/org/jbossmq/SpySession.java
  
  Index: SpySession.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/SpySession.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- SpySession.java   2001/03/02 01:12:41     1.3
  +++ SpySession.java   2001/06/22 03:16:02     1.4
  @@ -34,7 +34,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.3 $
  + *   @version $Revision: 1.4 $
    */
   abstract public class SpySession 
        implements Runnable, Session, XASession
  @@ -58,8 +58,7 @@
        protected boolean modeStop;
        //Is the session closed ?
        boolean closed;
  -     // Used to notify the session thread to deliver messages
  -     Mutex mutex;
  +
        // Used to lock the run() method
        Object runLock=new Object();
                
  @@ -80,7 +79,6 @@
                if( xaSession )
                        spyXAResource = new SpyXAResource(this);
   
  -             mutex = new Mutex();
                closed=false;
                consumers = new HashSet();
                                
  @@ -88,21 +86,6 @@
                if( spyXAResource==null && transacted )
                        currentTransactionId = 
connection.spyXAResourceManager.startTx();
                        
  -             //Start my thread 
  -             Thread oneThread=new Thread("SpySession") {
  -                     public void run() {
  -                             mutex.acquireLock();
  -                             while( !closed ) {
  -                                     SpySession.this.run();
  -                                     mutex.waitLock();
  -                             }
  -                             mutex.releaseLock();
  -                     }
  -             };
  -             oneThread.setDaemon(true);
  -             oneThread.start();
  -
  -             mutex.waitLocked();             
        }
   
   
  @@ -196,7 +179,6 @@
                if (closed) throw new IllegalStateException("The session is closed");  
                                          
                sessionConsumer = new SpyMessageConsumer(this, true);
                sessionConsumer.setMessageListener(listener);
  -             mutex.notifyLock();
        }
        
        // Delivers incoming messages in this session
  @@ -277,9 +259,6 @@
                
                connection.sessionClosing(this);
   
  -             // Notify the lock in case it is in wait 
  -             // Since no new events would occur after the close
  -             mutex.notifyLock();
        }
   
        
  @@ -396,11 +375,8 @@
        {
                
                if (closed) throw new IllegalStateException("The session is closed");  
                                                                  
  -             if (modeStop==newValue) return; 
  -             
  +             if (modeStop==newValue) return;                 
                modeStop=newValue;
  -             
  -             mutex.notifyLock();
        }
   
        void addConsumer(SpyMessageConsumer who) throws JMSException
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to