User: chirino 
  Date: 01/09/19 22:08:21

  Modified:    src/main/org/jboss/jms/asf ServerSessionPoolFactory.java
                        StdServerSession.java StdServerSessionPool.java
                        StdServerSessionPoolFactory.java
  Log:
  Transaction Timeout Fix: MDB that took a long time to process a message
  would timeout the transaction.  We now use the JBossMQ XAResource directly
  to manage the transaction for BMT beans.
  
  Revision  Changes    Path
  1.3       +19 -29    jboss/src/main/org/jboss/jms/asf/ServerSessionPoolFactory.java
  
  Index: ServerSessionPoolFactory.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/ServerSessionPoolFactory.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- ServerSessionPoolFactory.java     2001/07/21 04:18:28     1.2
  +++ ServerSessionPoolFactory.java     2001/09/20 05:08:21     1.3
  @@ -1,67 +1,57 @@
   /*
  - * Copyright (c) 2000 Peter Antman Tim <[EMAIL PROTECTED]>
  + * JBoss, the OpenSource J2EE webOS
    *
  - * This library is free software; you can redistribute it and/or
  - * modify it under the terms of the GNU Lesser General Public
  - * License as published by the Free Software Foundation; either
  - * version 2 of the License, or (at your option) any later version
  - * 
  - * This library is distributed in the hope that it will be useful,
  - * but WITHOUT ANY WARRANTY; without even the implied warranty of
  - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  - * Lesser General Public License for more details.
  - * 
  - * You should have received a copy of the GNU Lesser General Public
  - * License along with this library; if not, write to the Free Software
  - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  + * Distributable under LGPL license.
  + * See terms of license at gnu.org.
    */
   package org.jboss.jms.asf;
   
   import javax.jms.Connection;
  +import javax.jms.JMSException;
   import javax.jms.MessageListener;
   import javax.jms.ServerSessionPool;
  -import javax.jms.JMSException;
   
   /**
  - * Defines the model for creating <tt>ServerSessionPoolFactory</tt> objects.
  + * Defines the model for creating <tt>ServerSessionPoolFactory</tt> objects. <p>
    *
  - * <p>Created: Wed Nov 29 15:55:21 2000
  + * Created: Wed Nov 29 15:55:21 2000
    *
  - * @author <a href="mailto:[EMAIL PROTECTED]";>Peter Antman</a>.
  - * @version $Revision: 1.2 $
  + * @author    <a href="mailto:[EMAIL PROTECTED]";>Peter Antman</a> .
  + * @version   $Revision: 1.3 $
    */
   public interface ServerSessionPoolFactory
   {
      /**
       * Set the name of the factory.
       *
  -    * @param name    The name of the factory.
  +    * @param name  The name of the factory.
       */
      void setName(String name);
   
      /**
       * Get the name of the factory.
       *
  -    * @return    The name of the factory.
  +    * @return   The name of the factory.
       */
      String getName();
   
      /**
  -    * Create a new <tt>ServerSessionPool</tt>.
  +    * Create a new <tt>ServerSessionPool</tt> .
       *
       * @param con
       * @param maxSession
       * @param isTransacted
       * @param ack
       * @param listener
  -    * @return                A new pool.
  -    *
  +    * @param isContainerManaged  Description of Parameter
  +    * @return                    A new pool.
       * @throws JMSException
       */
      ServerSessionPool getServerSessionPool(Connection con,
  -                                          int maxSession,
  -                                          boolean isTransacted,
  -                                          int ack,
  -                                          MessageListener listener)
  -      throws JMSException;
  +         int maxSession,
  +         boolean isTransacted,
  +         int ack,
  +         boolean isContainerManaged,
  +         MessageListener listener)
  +          throws JMSException;
   }
  
  
  
  1.8       +307 -150  jboss/src/main/org/jboss/jms/asf/StdServerSession.java
  
  Index: StdServerSession.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSession.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- StdServerSession.java     2001/07/21 20:27:13     1.7
  +++ StdServerSession.java     2001/09/20 05:08:21     1.8
  @@ -1,21 +1,11 @@
   /*
  - * Copyright (c) 2000 Peter Antman Tim <[EMAIL PROTECTED]>
  + * JBoss, the OpenSource J2EE webOS
    *
  - * This library is free software; you can redistribute it and/or
  - * modify it under the terms of the GNU Lesser General Public
  - * License as published by the Free Software Foundation; either
  - * version 2 of the License, or (at your option) any later version
  - * 
  - * This library is distributed in the hope that it will be useful,
  - * but WITHOUT ANY WARRANTY; without even the implied warranty of
  - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  - * Lesser General Public License for more details.
  - * 
  - * You should have received a copy of the GNU Lesser General Public
  - * License along with this library; if not, write to the Free Software
  - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  + * Distributable under LGPL license.
  + * See terms of license at gnu.org.
    */
   package org.jboss.jms.asf;
  +import java.lang.reflect.Method;
   
   import javax.jms.JMSException;
   import javax.jms.ServerSession;
  @@ -34,225 +24,392 @@
   import org.jboss.tm.TransactionManagerService;
   
   /**
  - * An implementation of ServerSession.
  + * An implementation of ServerSession. <p>
    *
  - * <p>Created: Thu Dec  7 18:25:40 2000
  + * Created: Thu Dec 7 18:25:40 2000
    *
  - * @author <a href="mailto:[EMAIL PROTECTED]";>Peter Antman</a>.
  - * @author <a href="mailto:[EMAIL PROTECTED]";>Jason Dillon</a>
  - * @version $Revision: 1.7 $
  + * @author    <a href="mailto:[EMAIL PROTECTED]";>Peter Antman</a> .
  + * @author    <a href="mailto:[EMAIL PROTECTED]";>Jason Dillon</a>
  + * @version   $Revision: 1.8 $
    */
   public class StdServerSession
  -   implements Runnable, ServerSession
  +       implements Runnable, ServerSession
   {
  -   /** Instance logger. */
  +   /**
  +    * Instance logger.
  +    */
      private final Category log = Category.getInstance(this.getClass());
   
  -   /** The server session pool which we belong to. */
  -   private StdServerSessionPool serverSessionPool; // = null;
  +   /**
  +    * The server session pool which we belong to.
  +    */
  +   private StdServerSessionPool serverSessionPool;
  +   // = null;
   
  -   /** Our session resource. */
  -   private Session session; // = null;
  +   /**
  +    * Our session resource.
  +    */
  +   private Session session;
  +   // = null;
   
  -   /** Our XA session resource. */
  -   private XASession xaSession; // = null;
  +   /**
  +    * Our XA session resource.
  +    */
  +   private XASession xaSession;
  +   // = null;
   
  -   /** The transaction manager that we will use for transactions. */
  +   /**
  +    * The transaction manager that we will use for transactions.
  +    */
      private TransactionManager tm;
   
      /**
  -    * Create a <tt>StdServerSession</tt>.
  -    *
  -    * @param pool         The server session pool which we belong to.
  -    * @param session      Our session resource.
  -    * @param xaSession    Our XA session resource.
  +    * Use the session's XAResource directly if we have an JBossMQ XASession.
  +    * this allows us to get around the TX timeout problem when you have
  +    * extensive message processing.
  +    */
  +   private boolean useXAResouceDirectly;
  +
  +   /**
  +    * Create a <tt>StdServerSession</tt> .
       *
  -    * @throws JMSException    Transation manager was not found.
  +    * @param pool              The server session pool which we belong to.
  +    * @param session           Our session resource.
  +    * @param xaSession         Our XA session resource.
  +    * @param containerManaged  Description of Parameter
  +    * @throws JMSException     Transation manager was not found.
  +    * @exception JMSException  Description of Exception
       */
      StdServerSession(final StdServerSessionPool pool,
  -                    final Session session,
  -                    final XASession xaSession)
  -      throws JMSException
  +         final Session session,
  +         final XASession xaSession,
  +         final boolean containerManaged)
  +          throws JMSException
      {
         // assert pool != null
         // assert session != null
  -      
  +
         this.serverSessionPool = pool;
         this.session = session;
         this.xaSession = xaSession;
   
  -      if (log.isDebugEnabled()) {
  -         log.debug("initializing (pool, session, xaSession): " +
  -                   pool + ", " + session + ", " + xaSession);
  +      try
  +      {
  +         this.useXAResouceDirectly = !containerManaged && 
Class.forName("org.jboss.mq.SpySession").isAssignableFrom(session.getClass());
  +      }
  +      catch (ClassNotFoundException e)
  +      {
  +         this.useXAResouceDirectly = false;
         }
  -      
  +
  +      log.debug("initializing (pool, session, xaSession, useXAResouceDirectly): " +
  +            pool + ", " + session + ", " + xaSession + ", " + useXAResouceDirectly);
  +
         InitialContext ctx = null;
  -      try {
  +      try
  +      {
            ctx = new InitialContext();
            tm = (TransactionManager)
  -            ctx.lookup(TransactionManagerService.JNDI_NAME);
  +               ctx.lookup(TransactionManagerService.JNDI_NAME);
         }
  -      catch (Exception e) {
  +      catch (Exception e)
  +      {
            throw new JMSException("Transation manager was not found");
         }
  -      finally {
  -         if (ctx != null) {
  -            try {
  +      finally
  +      {
  +         if (ctx != null)
  +         {
  +            try
  +            {
                  ctx.close();
  +            }
  +            catch (Exception ignore)
  +            {
               }
  -            catch (Exception ignore) {}
            }
         }
      }
   
      // --- Impl of JMS standard API
  -     
  +
      /**
  -    * Returns the session.
  +    * Returns the session. <p>
       *
  -    * <p>This simply returns what it has fetched from the connection. It is
  -    *    up to the jms provider to typecast it and have a private API to stuff
  -    *    messages into it.
  +    * This simply returns what it has fetched from the connection. It is up to
  +    * the jms provider to typecast it and have a private API to stuff messages
  +    * into it.
       *
  -    * @return    The session.
  +    * @return                  The session.
  +    * @exception JMSException  Description of Exception
       */
      public Session getSession() throws JMSException
      {
         return session;
      }
   
  -   /**
  -    * Start the session and begin consuming messages.
  -    *
  -    * @throws JMSException    No listener has been specified.
  -    */
  -   public void start() throws JMSException {
  -      log.debug("starting invokes on server session");
  -
  -      if (session != null) {
  -         try {
  -            serverSessionPool.getExecutor().execute(this);
  -         }
  -         catch (InterruptedException ignore) {}
  -      }
  -      else {
  -         throw new JMSException("No listener has been specified");
  -      }
  -   }
  -    
      //--- Protected parts, used by other in the package
  -     
  +
      /**
  -    * Runs in an own thread, basically calls the session.run(), it is up
  -    * to the session to have been filled with messages and it will run
  -    * against the listener set in StdServerSessionPool. When it has send
  -    * all its messages it returns.
  -    *
  -    * HC: run() also starts a transaction with the TransactionManager and
  -    * enlists the XAResource of the JMS XASession if a XASession was
  -    * available.  A good JMS implementation should provide the XASession
  -    * for use in the ASF.  So we optimize for the case where we have an
  -    * XASession.  So, for the case where we do not have an XASession and
  -    * the bean is not transacted, we have the unneeded overhead of creating
  -    * a Transaction.  I'm leaving it this way since it keeps the code simpler
  -    * and that case should not be too common (JBossMQ provides XASessions).
  +    * Runs in an own thread, basically calls the session.run(), it is up to the
  +    * session to have been filled with messages and it will run against the
  +    * listener set in StdServerSessionPool. When it has send all its messages it
  +    * returns. HC: run() also starts a transaction with the TransactionManager
  +    * and enlists the XAResource of the JMS XASession if a XASession was
  +    * available. A good JMS implementation should provide the XASession for use
  +    * in the ASF. So we optimize for the case where we have an XASession. So,
  +    * for the case where we do not have an XASession and the bean is not
  +    * transacted, we have the unneeded overhead of creating a Transaction. I'm
  +    * leaving it this way since it keeps the code simpler and that case should
  +    * not be too common (JBossMQ provides XASessions).
       */
  -   public void run() {
  +   public void run()
  +   {
         log.debug("running...");
  -      
  +
  +      log.info("running (pool, session, xaSession, useXAResouceDirectly): " +
  +            ", " + session + ", " + xaSession + ", " + useXAResouceDirectly);
  +
  +      // Used if run with useXAResouceDirectly if true
  +      JBossMQTXInterface jbossMQTXInterface = null;
  +
  +      // Used if run with useXAResouceDirectly if false
         Transaction trans = null;
  -      try {
  -         tm.begin();
  -         trans = tm.getTransaction();
  -         
  -         if (xaSession != null) {
  -            XAResource res = xaSession.getXAResource();
  -            trans.enlistResource(res);
  -            if (log.isDebugEnabled()) {
  -               log.debug("XAResource '"+res+"' enlisted.");
  +      try
  +      {
  +
  +         if (useXAResouceDirectly)
  +         {
  +            // Use JBossMQ One Phase Commit to commit the TX
  +            jbossMQTXInterface = new JBossMQTXInterface(session);
  +            jbossMQTXInterface.startTX();
  +
  +         }
  +         else
  +         {
  +
  +            // Use the TM to control the TX
  +            tm.begin();
  +            trans = tm.getTransaction();
  +
  +            if (xaSession != null)
  +            {
  +               XAResource res = xaSession.getXAResource();
  +               trans.enlistResource(res);
  +               if (log.isDebugEnabled())
  +               {
  +                  log.debug("XAResource '" + res + "' enlisted.");
  +               }
               }
  -         } 
  +         }
  +         //currentTransactionId = connection.spyXAResourceManager.startTx();
   
            // run the session
            session.run();
         }
  -      catch (Exception e) {
  +      catch (Exception e)
  +      {
            log.error("session failed to run; setting rollback only", e);
  -         
  -         try {
  -            // The transaction will be rolledback in the finally
  -            trans.setRollbackOnly();
  -         }
  -         catch (Exception x) {
  -            log.error("failed to set rollback only", x);
  -         }
  -      }
  -      finally {
  -         try {
  -            // Marked rollback
  -            if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
  -               log.info("Rolling back JMS transaction");
  -               // actually roll it back 
  -               trans.rollback();
  -                 
  -               // NO XASession? then manually rollback.  
  -               // This is not so good but
  -               // it's the best we can do if we have no XASession.
  -               if (xaSession == null && serverSessionPool.isTransacted()) {
  -                  session.rollback();
  +
  +         if (useXAResouceDirectly)
  +         {
  +            // Use JBossMQ One Phase Commit to commit the TX
  +            jbossMQTXInterface.setRollbackOnly();
  +         }
  +         else
  +         {
  +
  +            // Mark for tollback TX via TM
  +            try
  +            {
  +               // The transaction will be rolledback in the finally
  +               trans.setRollbackOnly();
  +            }
  +            catch (Exception x)
  +            {
  +               log.error("failed to set rollback only", x);
  +            }
  +         }
  +
  +      }
  +      finally
  +      {
  +         try
  +         {
  +            if (useXAResouceDirectly)
  +            {
  +               // Use JBossMQ One Phase Commit to commit the TX
  +               jbossMQTXInterface.endTX();
  +
  +            }
  +            else
  +            {
  +               // Use the TM to commit the Tx
  +
  +               // Marked rollback
  +               if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
  +               {
  +                  log.info("Rolling back JMS transaction");
  +                  // actually roll it back
  +                  trans.rollback();
  +
  +                  // NO XASession? then manually rollback.
  +                  // This is not so good but
  +                  // it's the best we can do if we have no XASession.
  +                  if (xaSession == null && serverSessionPool.isTransacted())
  +                  {
  +                     session.rollback();
  +                  }
                  }
  -            } else if (trans.getStatus() == Status.STATUS_ACTIVE) {
  -               // Commit tx
  -               // This will happen if
  -               // a) everything goes well
  -               // b) app. exception was thrown
  -               trans.commit();
  -                 
  -               // NO XASession? then manually commit.  This is not so good but
  -               // it's the best we can do if we have no XASession.
  -               if (xaSession == null && serverSessionPool.isTransacted()) {
  -                  session.commit();
  +               else if (trans.getStatus() == Status.STATUS_ACTIVE)
  +               {
  +                  // Commit tx
  +                  // This will happen if
  +                  // a) everything goes well
  +                  // b) app. exception was thrown
  +                  trans.commit();
  +
  +                  // NO XASession? then manually commit.  This is not so good but
  +                  // it's the best we can do if we have no XASession.
  +                  if (xaSession == null && serverSessionPool.isTransacted())
  +                  {
  +                     session.commit();
  +                  }
                  }
               }
  +
            }
  -         catch (Exception e) {
  +         catch (Exception e)
  +         {
               log.error("failed to commit/rollback", e);
            }
  -         
  +
            StdServerSession.this.recycle();
         }
   
         log.debug("done");
      }
  -    
  +
      /**
  -    * This method is called by the ServerSessionPool when it is ready to
  -    * be recycled intot the pool
  +    * Start the session and begin consuming messages.
  +    *
  +    * @throws JMSException  No listener has been specified.
       */
  -   void recycle()
  +   public void start() throws JMSException
      {
  -      serverSessionPool.recycle(this);
  +      log.debug("starting invokes on server session");
  +
  +      if (session != null)
  +      {
  +         try
  +         {
  +            serverSessionPool.getExecutor().execute(this);
  +         }
  +         catch (InterruptedException ignore)
  +         {
  +         }
  +      }
  +      else
  +      {
  +         throw new JMSException("No listener has been specified");
  +      }
      }
   
      /**
       * Called by the ServerSessionPool when the sessions should be closed.
       */
  -   void close() {
  -      if (session != null) {
  -         try {
  +   void close()
  +   {
  +      if (session != null)
  +      {
  +         try
  +         {
               session.close();
  -         } catch (Exception ignore) {}
  -         
  +         }
  +         catch (Exception ignore)
  +         {
  +         }
  +
            session = null;
         }
  -      
  -      if (xaSession != null) {
  -         try {
  +
  +      if (xaSession != null)
  +      {
  +         try
  +         {
               xaSession.close();
  -         } catch (Exception ignore) {}
  +         }
  +         catch (Exception ignore)
  +         {
  +         }
            xaSession = null;
         }
   
         log.debug("closed");
  +   }
  +
  +   /**
  +    * This method is called by the ServerSessionPool when it is ready to be
  +    * recycled intot the pool
  +    */
  +   void recycle()
  +   {
  +      serverSessionPool.recycle(this);
  +   }
  +
  +
  +   /**
  +    * #Description of the Class
  +    */
  +   private static class JBossMQTXInterface
  +   {
  +
  +      static boolean initialzied = false;
  +      static Method getXAResourceManager;
  +      static Method startTx;
  +      static Method endTx;
  +      static Method commit;
  +      static Method rollback;
  +      boolean doRollback = false;
  +      Object xid = null;
  +      Object spyXAResourceManager = null;
  +
  +      JBossMQTXInterface(Session sess) throws Exception
  +      {
  +         if (!initialzied)
  +         {
  +            getXAResourceManager = 
Class.forName("org.jboss.mq.SpySession").getMethod("getXAResourceManager", new 
Class[]{});
  +            startTx = 
Class.forName("org.jboss.mq.SpyXAResourceManager").getMethod("startTx", new Class[]{});
  +            endTx = 
Class.forName("org.jboss.mq.SpyXAResourceManager").getMethod("endTx", new 
Class[]{Object.class, boolean.class});
  +            commit = 
Class.forName("org.jboss.mq.SpyXAResourceManager").getMethod("commit", new 
Class[]{Object.class, boolean.class});
  +            rollback = 
Class.forName("org.jboss.mq.SpyXAResourceManager").getMethod("rollback", new 
Class[]{Object.class});
  +            initialzied = true;
  +         }
  +         spyXAResourceManager = getXAResourceManager.invoke(sess, new Object[]{});
  +      }
  +
  +      void setRollbackOnly()
  +      {
  +         doRollback = true;
  +      }
  +
  +      void startTX() throws Exception
  +      {
  +         xid = startTx.invoke(spyXAResourceManager, new Object[]{});
  +      }
  +
  +      void endTX() throws Exception
  +      {
  +         if (doRollback)
  +         {
  +            endTx.invoke(spyXAResourceManager, new Object[]{xid, new 
Boolean(true)});
  +            rollback.invoke(spyXAResourceManager, new Object[]{xid});
  +         }
  +         else
  +         {
  +            endTx.invoke(spyXAResourceManager, new Object[]{xid, new 
Boolean(true)});
  +            commit.invoke(spyXAResourceManager, new Object[]{xid, new 
Boolean(true)});
  +         }
  +      }
      }
   }
  
  
  
  1.13      +193 -136  jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java
  
  Index: StdServerSessionPool.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- StdServerSessionPool.java 2001/08/17 22:22:55     1.12
  +++ StdServerSessionPool.java 2001/09/20 05:08:21     1.13
  @@ -1,128 +1,131 @@
   /*
  - * Copyright (c) 2000 Peter Antman Tim <[EMAIL PROTECTED]>
  + * JBoss, the OpenSource J2EE webOS
    *
  - * This library is free software; you can redistribute it and/or
  - * modify it under the terms of the GNU Lesser General Public
  - * License as published by the Free Software Foundation; either
  - * version 2 of the License, or (at your option) any later version
  - *
  - * This library is distributed in the hope that it will be useful,
  - * but WITHOUT ANY WARRANTY; without even the implied warranty of
  - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  - * Lesser General Public License for more details.
  - *
  - * You should have received a copy of the GNU Lesser General Public
  - * License along with this library; if not, write to the Free Software
  - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  + * Distributable under LGPL license.
  + * See terms of license at gnu.org.
    */
   package org.jboss.jms.asf;
  +import EDU.oswego.cs.dl.util.concurrent.Executor;
   
  -import java.util.List;
  +import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
  +import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
   import java.util.ArrayList;
   import java.util.Iterator;
   
  +import java.util.List;
  +
   import javax.jms.Connection;
   import javax.jms.JMSException;
  +import javax.jms.MessageListener;
  +import javax.jms.QueueConnection;
   import javax.jms.ServerSession;
   import javax.jms.ServerSessionPool;
  -import javax.jms.MessageListener;
  +import javax.jms.Session;
   import javax.jms.TopicConnection;
  -import javax.jms.XATopicConnection;
  -import javax.jms.QueueConnection;
   import javax.jms.XAQueueConnection;
  -import javax.jms.Session;
  -import javax.jms.XASession;
   import javax.jms.XAQueueSession;
  +import javax.jms.XASession;
  +import javax.jms.XATopicConnection;
   import javax.jms.XATopicSession;
   
  -import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
  -import EDU.oswego.cs.dl.util.concurrent.Executor;
  -import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
  -
   import org.apache.log4j.Category;
   
   /**
  - * Implementation of ServerSessionPool.
  + * Implementation of ServerSessionPool. <p>
    *
  - * <p>Created: Thu Dec  7 17:02:03 2000
  + * Created: Thu Dec 7 17:02:03 2000
    *
  - * @author <a href="mailto:[EMAIL PROTECTED]";>Peter Antman</a>.
  - * @version $Revision: 1.12 $
  + * @author    <a href="mailto:[EMAIL PROTECTED]";>Peter Antman</a> .
  + * @version   $Revision: 1.13 $
    */
   public class StdServerSessionPool
  -   implements ServerSessionPool
  +       implements ServerSessionPool
   {
  -   /** The default size of the pool. */
  -   private static final int DEFAULT_POOL_SIZE = 15;
  +   /**
  +    * The default size of the pool.
  +    */
  +   private final static int DEFAULT_POOL_SIZE = 15;
   
  -   /** The thread group which session workers will run. */
  +   /**
  +    * The thread group which session workers will run.
  +    */
      private static ThreadGroup threadGroup =
  -      new ThreadGroup("ASF Session Pool Threads");
  +         new ThreadGroup("ASF Session Pool Threads");
   
  -   /** Instance logger. */
  +   /**
  +    * Instance logger.
  +    */
      private final Category log = Category.getInstance(this.getClass());
   
  -   /** The size of the pool. */
  +   /**
  +    * The size of the pool.
  +    */
      private int poolSize;
   
  -   /** The message acknowledgment mode. */
  +   /**
  +    * The message acknowledgment mode.
  +    */
      private int ack;
  +
  +   /**
  +    * Is the bean container managed?
  +    */
  +   private boolean containerManaged;
   
  -   /** True if this is a transacted session. */
  +   /**
  +    * True if this is a transacted session.
  +    */
      private boolean transacted;
   
  -   /** The session connection. */
  +   /**
  +    * The session connection.
  +    */
      private Connection con;
   
  -   /** The message listener for the session. */
  +   /**
  +    * The message listener for the session.
  +    */
      private MessageListener listener;
   
  -   /** The list of ServerSessions. */
  +   /**
  +    * The list of ServerSessions.
  +    */
      private List sessionPool;
   
  -   /** The executor for processing messages? */
  +   /**
  +    * The executor for processing messages?
  +    */
      private PooledExecutor executor;
   
  -   /** Used to signal when the Pool is being closed down */
  +   /**
  +    * Used to signal when the Pool is being closed down
  +    */
      private boolean closing = false;
   
  -   /** Used during close down to wait for all server sessions to be returned and 
closed.*/
  -   private int numServerSessions = 0;
  -
      /**
  -    * Construct a <tt>StdServerSessionPool</tt> using the default
  -    * pool size.
  -    *
  -    * @param con
  -    * @param transacted
  -    * @param ack
  -    * @param listener
  +    * Used during close down to wait for all server sessions to be returned and
  +    * closed.
       */
  -   public StdServerSessionPool(final Connection con,
  -                               final boolean transacted,
  -                               final int ack,
  -                               final MessageListener listener)
  -      throws JMSException
  -   {
  -      this(con, transacted, ack, listener, DEFAULT_POOL_SIZE);
  -   }
  +   private int numServerSessions = 0;
   
      /**
  -    * Construct a <tt>StdServerSessionPool</tt> using the default
  -    * pool size.
  +    * Construct a <tt>StdServerSessionPool</tt> using the default pool size.
       *
       * @param con
       * @param transacted
       * @param ack
       * @param listener
       * @param maxSession
  +    * @param isContainerManaged  Description of Parameter
  +    * @exception JMSException    Description of Exception
       */
      public StdServerSessionPool(final Connection con,
  -                               final boolean transacted,
  -                               final int ack,
  -                               final MessageListener listener,
  -                               final int maxSession)
  -      throws JMSException
  +         final boolean transacted,
  +         final int ack,
  +         final boolean isContainerManaged,
  +         final MessageListener listener,
  +         final int maxSession)
  +          throws JMSException
      {
         this.con = con;
         this.ack = ack;
  @@ -130,19 +133,29 @@
         this.transacted = transacted;
         this.poolSize = maxSession;
         this.sessionPool = new ArrayList(maxSession);
  +      this.containerManaged = isContainerManaged;
   
         // setup the worker pool
         executor = new PooledExecutor(poolSize);
         executor.setMinimumPoolSize(0);
  -      executor.setKeepAliveTime(1000*30);
  +      executor.setKeepAliveTime(1000 * 30);
         executor.waitWhenBlocked();
  -      executor.setThreadFactory(new ThreadFactory() {
  +      executor.setThreadFactory(
  +         new ThreadFactory()
  +         {
               private volatile int count = 0;
  -            
  -            public Thread newThread(final Runnable command) {
  +
  +            /**
  +             * #Description of the Method
  +             *
  +             * @param command  Description of Parameter
  +             * @return         Description of the Returned Value
  +             */
  +            public Thread newThread(final Runnable command)
  +            {
                  return new Thread(threadGroup,
  -                                 command,
  -                                 "Thread Pool Worker-" + count++);
  +                     command,
  +                     "Thread Pool Worker-" + count++);
               }
            });
   
  @@ -155,36 +168,45 @@
   
      /**
       * Get a server session.
  -    *
  -    * @return    A server session.
       *
  -    * @throws JMSException    Failed to get a server session.
  +    * @return               A server session.
  +    * @throws JMSException  Failed to get a server session.
       */
      public ServerSession getServerSession() throws JMSException
      {
         log.debug("getting a server session");
         ServerSession session = null;
   
  -      try {
  -         while (true) {
  -            synchronized (sessionPool) {
  -               if(closing){
  +      try
  +      {
  +         while (true)
  +         {
  +            synchronized (sessionPool)
  +            {
  +               if (closing)
  +               {
                     throw new JMSException("Cannot get session after pool has been 
closed down.");
                  }
  -               else if (sessionPool.size() > 0) {
  +               else if (sessionPool.size() > 0)
  +               {
                     session = (ServerSession)sessionPool.remove(0);
                     break;
                  }
  -               else {
  -                  try {
  +               else
  +               {
  +                  try
  +                  {
                        sessionPool.wait();
                     }
  -                  catch (InterruptedException ignore) {}
  +                  catch (InterruptedException ignore)
  +                  {
  +                  }
                  }
               }
            }
         }
  -      catch (Exception e) {
  +      catch (Exception e)
  +      {
            throw new JMSException("Failed to get a server session: " + e);
         }
   
  @@ -194,59 +216,29 @@
         return session;
      }
   
  -   // --- Protected messages for StdServerSession to use
  -
      /**
  -    * Returns true if this server session is transacted.
  -    */
  -   boolean isTransacted() {
  -      return transacted;
  -   }
  -
  -   /**
  -    * Recycle a server session.
  -    */
  -   void recycle(StdServerSession session) {
  -      synchronized (sessionPool) {
  -         if(closing){
  -           session.close();
  -           numServerSessions--;
  -           if(numServerSessions == 0)  //notify clear thread.
  -              sessionPool.notifyAll();
  -         }else{
  -           sessionPool.add(session);
  -           sessionPool.notifyAll();
  -           log.debug("recycled server session: " + session);
  -         }
  -      }
  -   }
  -
  -   /**
  -    * Get the executor we are using.
  -    */
  -   Executor getExecutor() {
  -      return executor;
  -   }
  -
  -   /**
       * Clear the pool, clear out both threads and ServerSessions,
       * connection.stop() should be run before this method.
       */
  -   public void clear() {
  -      synchronized (sessionPool) {
  +   public void clear()
  +   {
  +      synchronized (sessionPool)
  +      {
            // FIXME - is there a runaway condition here. What if a
            // ServerSession are taken by a ConnecionConsumer? Should we set
            // a flag somehow so that no ServerSessions are recycled and the
            // ThreadPool won't leave any more threads out.
            closing = true;
   
  -         if (log.isDebugEnabled()) {
  +         if (log.isDebugEnabled())
  +         {
               log.debug("Clearing " + sessionPool.size() +
  -                      " from ServerSessionPool");
  +                  " from ServerSessionPool");
            }
   
            Iterator iter = sessionPool.iterator();
  -         while (iter.hasNext()) {
  +         while (iter.hasNext())
  +         {
               StdServerSession ses = (StdServerSession)iter.next();
               // Should we do anything to the server session?
               ses.close();
  @@ -261,9 +253,68 @@
         executor.shutdownAfterProcessingCurrentlyQueuedTasks();
   
         //wait for all server sessions to be returned.
  -      synchronized(sessionPool){
  -         while(numServerSessions > 0)
  -            try{ sessionPool.wait(); }catch(InterruptedException ignore){}
  +      synchronized (sessionPool)
  +      {
  +         while (numServerSessions > 0)
  +         {
  +            try
  +            {
  +               sessionPool.wait();
  +            }
  +            catch (InterruptedException ignore)
  +            {
  +            }
  +         }
  +      }
  +   }
  +
  +   /**
  +    * Get the executor we are using.
  +    *
  +    * @return   The Executor value
  +    */
  +   Executor getExecutor()
  +   {
  +      return executor;
  +   }
  +
  +   // --- Protected messages for StdServerSession to use
  +
  +   /**
  +    * Returns true if this server session is transacted.
  +    *
  +    * @return   The Transacted value
  +    */
  +   boolean isTransacted()
  +   {
  +      return transacted;
  +   }
  +
  +   /**
  +    * Recycle a server session.
  +    *
  +    * @param session  Description of Parameter
  +    */
  +   void recycle(StdServerSession session)
  +   {
  +      synchronized (sessionPool)
  +      {
  +         if (closing)
  +         {
  +            session.close();
  +            numServerSessions--;
  +            if (numServerSessions == 0)
  +            {
  +               //notify clear thread.
  +               sessionPool.notifyAll();
  +            }
  +         }
  +         else
  +         {
  +            sessionPool.add(session);
  +            sessionPool.notifyAll();
  +            log.debug("recycled server session: " + session);
  +         }
         }
      }
   
  @@ -271,32 +322,38 @@
   
      private void init() throws JMSException
      {
  -      for (int index = 0; index < poolSize; index++) {
  +      for (int index = 0; index < poolSize; index++)
  +      {
            // Here is the meat, that MUST follow the spec
            Session ses = null;
            XASession xaSes = null;
   
            log.debug("initializing with connection: " + con);
   
  -         if (con instanceof XATopicConnection) {
  +         if (con instanceof XATopicConnection)
  +         {
               xaSes = ((XATopicConnection)con).createXATopicSession();
               ses = ((XATopicSession)xaSes).getTopicSession();
            }
  -         else if (con instanceof XAQueueConnection) {
  +         else if (con instanceof XAQueueConnection)
  +         {
               xaSes = ((XAQueueConnection)con).createXAQueueSession();
               ses = ((XAQueueSession)xaSes).getQueueSession();
            }
  -         else if (con instanceof TopicConnection) {
  +         else if (con instanceof TopicConnection)
  +         {
               ses = ((TopicConnection)con).createTopicSession(transacted, ack);
               log.warn("Using a non-XA TopicConnection.  " +
  -                     "It will not be able to participate in a Global UOW");
  +                  "It will not be able to participate in a Global UOW");
            }
  -         else if (con instanceof QueueConnection) {
  +         else if (con instanceof QueueConnection)
  +         {
               ses = ((QueueConnection)con).createQueueSession(transacted, ack);
               log.warn("Using a non-XA QueueConnection.  " +
  -                     "It will not be able to participate in a Global UOW");
  +                  "It will not be able to participate in a Global UOW");
            }
  -         else {
  +         else
  +         {
               // should never happen really
               log.error("Connection was not reconizable: " + con);
               throw new JMSException("Connection was not reconizable: " + con);
  @@ -308,7 +365,7 @@
            ses.setMessageListener(listener);
   
            // create the server session and add it to the pool
  -         ServerSession serverSession = new StdServerSession(this, ses, xaSes);
  +         ServerSession serverSession = new StdServerSession(this, ses, xaSes, 
containerManaged);
            sessionPool.add(serverSession);
            numServerSessions++;
            log.debug("added server session to the pool: " + serverSession);
  
  
  
  1.6       +30 -45    
jboss/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java
  
  Index: StdServerSessionPoolFactory.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- StdServerSessionPoolFactory.java  2001/07/21 04:18:28     1.5
  +++ StdServerSessionPoolFactory.java  2001/09/20 05:08:21     1.6
  @@ -1,93 +1,78 @@
   /*
  - * Copyright (c) 2000 Peter Antman DN <[EMAIL PROTECTED]>
  + * JBoss, the OpenSource J2EE webOS
    *
  - * This library is free software; you can redistribute it and/or
  - * modify it under the terms of the GNU Lesser General Public
  - * License as published by the Free Software Foundation; either
  - * version 2 of the License, or (at your option) any later version
  - * 
  - * This library is distributed in the hope that it will be useful,
  - * but WITHOUT ANY WARRANTY; without even the implied warranty of
  - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  - * Lesser General Public License for more details.
  - * 
  - * You should have received a copy of the GNU Lesser General Public
  - * License along with this library; if not, write to the Free Software
  - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  + * Distributable under LGPL license.
  + * See terms of license at gnu.org.
    */
   package org.jboss.jms.asf;
   
   import java.io.Serializable;
  -
  -import javax.jms.ServerSessionPool;
  -import javax.jms.MessageListener;
   import javax.jms.Connection;
   import javax.jms.JMSException;
  +import javax.jms.MessageListener;
   
  +import javax.jms.ServerSessionPool;
  +
   /**
  - * An implementation of ServerSessionPoolFactory.
  + * An implementation of ServerSessionPoolFactory. <p>
    *
  - * <p>Created: Fri Dec 22 09:47:41 2000
  + * Created: Fri Dec 22 09:47:41 2000
    *
  - * @author <a href="mailto:[EMAIL PROTECTED]";>Peter Antman</a>.
  - * @version $Revision: 1.5 $
  + * @author    <a href="mailto:[EMAIL PROTECTED]";>Peter Antman</a> .
  + * @version   $Revision: 1.6 $
    */
   public class StdServerSessionPoolFactory
  -   implements ServerSessionPoolFactory, Serializable
  +       implements ServerSessionPoolFactory, Serializable
   {
  -   /** The name of this factory. */
  +   /**
  +    * The name of this factory.
  +    */
      private String name;
   
      /**
  -    * Construct a <tt>StdServerSessionPoolFactory</tt>.
  +    * Construct a <tt>StdServerSessionPoolFactory</tt> .
       */
  -   public StdServerSessionPoolFactory() {
  +   public StdServerSessionPoolFactory()
  +   {
         super();
      }
   
      /**
       * Set the name of the factory.
       *
  -    * @param name    The name of the factory.
  +    * @param name  The name of the factory.
       */
  -   public void setName(final String name) {
  +   public void setName(final String name)
  +   {
         this.name = name;
      }
  -   
  +
      /**
       * Get the name of the factory.
       *
  -    * @return    The name of the factory.
  +    * @return   The name of the factory.
       */
  -   public String getName() {
  +   public String getName()
  +   {
         return name;
      }
   
      /**
  -    * Create a new <tt>ServerSessionPool</tt>.
  +    * Create a new <tt>ServerSessionPool</tt> .
       *
       * @param con
       * @param maxSession
       * @param isTransacted
       * @param ack
       * @param listener
  -    * @return                A new pool.
  -    *
  +    * @param isContainerManaged          Description of Parameter
  +    * @return                            A new pool.
       * @throws JMSException
  +    * @exception javax.jms.JMSException  Description of Exception
       */
  -   public ServerSessionPool getServerSessionPool(final Connection con,
  -                                                 final int maxSession,
  -                                                 final boolean isTransacted,
  -                                                 final int ack,
  -                                                 final MessageListener listener)
  -      throws JMSException
  +   public javax.jms.ServerSessionPool getServerSessionPool(javax.jms.Connection 
con, int maxSession, boolean isTransacted, int ack, boolean isContainerManaged, 
javax.jms.MessageListener listener) throws javax.jms.JMSException
      {
  -      ServerSessionPool pool = (ServerSessionPool)
  -         new StdServerSessionPool(con,
  -                                  isTransacted,
  -                                  ack,
  -                                  listener,
  -                                  maxSession);
  +      ServerSessionPool pool = (ServerSessionPool)new StdServerSessionPool(con, 
isTransacted, ack, isContainerManaged, listener, maxSession);
         return pool;
      }
   }
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to