User: chirino
Date: 01/09/19 22:08:21
Modified: src/main/org/jboss/jms/asf ServerSessionPoolFactory.java
StdServerSession.java StdServerSessionPool.java
StdServerSessionPoolFactory.java
Log:
Transaction Timeout Fix: MDB that took a long time to process a message
would timeout the transaction. We now use the JBossMQ XAResource directly
to manage the transaction for BMT beans.
Revision Changes Path
1.3 +19 -29 jboss/src/main/org/jboss/jms/asf/ServerSessionPoolFactory.java
Index: ServerSessionPoolFactory.java
===================================================================
RCS file:
/cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/ServerSessionPoolFactory.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- ServerSessionPoolFactory.java 2001/07/21 04:18:28 1.2
+++ ServerSessionPoolFactory.java 2001/09/20 05:08:21 1.3
@@ -1,67 +1,57 @@
/*
- * Copyright (c) 2000 Peter Antman Tim <[EMAIL PROTECTED]>
+ * JBoss, the OpenSource J2EE webOS
*
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
*/
package org.jboss.jms.asf;
import javax.jms.Connection;
+import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.ServerSessionPool;
-import javax.jms.JMSException;
/**
- * Defines the model for creating <tt>ServerSessionPoolFactory</tt> objects.
+ * Defines the model for creating <tt>ServerSessionPoolFactory</tt> objects. <p>
*
- * <p>Created: Wed Nov 29 15:55:21 2000
+ * Created: Wed Nov 29 15:55:21 2000
*
- * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a>.
- * @version $Revision: 1.2 $
+ * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a> .
+ * @version $Revision: 1.3 $
*/
public interface ServerSessionPoolFactory
{
/**
* Set the name of the factory.
*
- * @param name The name of the factory.
+ * @param name The name of the factory.
*/
void setName(String name);
/**
* Get the name of the factory.
*
- * @return The name of the factory.
+ * @return The name of the factory.
*/
String getName();
/**
- * Create a new <tt>ServerSessionPool</tt>.
+ * Create a new <tt>ServerSessionPool</tt> .
*
* @param con
* @param maxSession
* @param isTransacted
* @param ack
* @param listener
- * @return A new pool.
- *
+ * @param isContainerManaged Description of Parameter
+ * @return A new pool.
* @throws JMSException
*/
ServerSessionPool getServerSessionPool(Connection con,
- int maxSession,
- boolean isTransacted,
- int ack,
- MessageListener listener)
- throws JMSException;
+ int maxSession,
+ boolean isTransacted,
+ int ack,
+ boolean isContainerManaged,
+ MessageListener listener)
+ throws JMSException;
}
1.8 +307 -150 jboss/src/main/org/jboss/jms/asf/StdServerSession.java
Index: StdServerSession.java
===================================================================
RCS file: /cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSession.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- StdServerSession.java 2001/07/21 20:27:13 1.7
+++ StdServerSession.java 2001/09/20 05:08:21 1.8
@@ -1,21 +1,11 @@
/*
- * Copyright (c) 2000 Peter Antman Tim <[EMAIL PROTECTED]>
+ * JBoss, the OpenSource J2EE webOS
*
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
*/
package org.jboss.jms.asf;
+import java.lang.reflect.Method;
import javax.jms.JMSException;
import javax.jms.ServerSession;
@@ -34,225 +24,392 @@
import org.jboss.tm.TransactionManagerService;
/**
- * An implementation of ServerSession.
+ * An implementation of ServerSession. <p>
*
- * <p>Created: Thu Dec 7 18:25:40 2000
+ * Created: Thu Dec 7 18:25:40 2000
*
- * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a>.
- * @author <a href="mailto:[EMAIL PROTECTED]">Jason Dillon</a>
- * @version $Revision: 1.7 $
+ * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a> .
+ * @author <a href="mailto:[EMAIL PROTECTED]">Jason Dillon</a>
+ * @version $Revision: 1.8 $
*/
public class StdServerSession
- implements Runnable, ServerSession
+ implements Runnable, ServerSession
{
- /** Instance logger. */
+ /**
+ * Instance logger.
+ */
private final Category log = Category.getInstance(this.getClass());
- /** The server session pool which we belong to. */
- private StdServerSessionPool serverSessionPool; // = null;
+ /**
+ * The server session pool which we belong to.
+ */
+ private StdServerSessionPool serverSessionPool;
+ // = null;
- /** Our session resource. */
- private Session session; // = null;
+ /**
+ * Our session resource.
+ */
+ private Session session;
+ // = null;
- /** Our XA session resource. */
- private XASession xaSession; // = null;
+ /**
+ * Our XA session resource.
+ */
+ private XASession xaSession;
+ // = null;
- /** The transaction manager that we will use for transactions. */
+ /**
+ * The transaction manager that we will use for transactions.
+ */
private TransactionManager tm;
/**
- * Create a <tt>StdServerSession</tt>.
- *
- * @param pool The server session pool which we belong to.
- * @param session Our session resource.
- * @param xaSession Our XA session resource.
+ * Use the session's XAResource directly if we have an JBossMQ XASession.
+ * this allows us to get around the TX timeout problem when you have
+ * extensive message processing.
+ */
+ private boolean useXAResouceDirectly;
+
+ /**
+ * Create a <tt>StdServerSession</tt> .
*
- * @throws JMSException Transation manager was not found.
+ * @param pool The server session pool which we belong to.
+ * @param session Our session resource.
+ * @param xaSession Our XA session resource.
+ * @param containerManaged Description of Parameter
+ * @throws JMSException Transation manager was not found.
+ * @exception JMSException Description of Exception
*/
StdServerSession(final StdServerSessionPool pool,
- final Session session,
- final XASession xaSession)
- throws JMSException
+ final Session session,
+ final XASession xaSession,
+ final boolean containerManaged)
+ throws JMSException
{
// assert pool != null
// assert session != null
-
+
this.serverSessionPool = pool;
this.session = session;
this.xaSession = xaSession;
- if (log.isDebugEnabled()) {
- log.debug("initializing (pool, session, xaSession): " +
- pool + ", " + session + ", " + xaSession);
+ try
+ {
+ this.useXAResouceDirectly = !containerManaged &&
Class.forName("org.jboss.mq.SpySession").isAssignableFrom(session.getClass());
+ }
+ catch (ClassNotFoundException e)
+ {
+ this.useXAResouceDirectly = false;
}
-
+
+ log.debug("initializing (pool, session, xaSession, useXAResouceDirectly): " +
+ pool + ", " + session + ", " + xaSession + ", " + useXAResouceDirectly);
+
InitialContext ctx = null;
- try {
+ try
+ {
ctx = new InitialContext();
tm = (TransactionManager)
- ctx.lookup(TransactionManagerService.JNDI_NAME);
+ ctx.lookup(TransactionManagerService.JNDI_NAME);
}
- catch (Exception e) {
+ catch (Exception e)
+ {
throw new JMSException("Transation manager was not found");
}
- finally {
- if (ctx != null) {
- try {
+ finally
+ {
+ if (ctx != null)
+ {
+ try
+ {
ctx.close();
+ }
+ catch (Exception ignore)
+ {
}
- catch (Exception ignore) {}
}
}
}
// --- Impl of JMS standard API
-
+
/**
- * Returns the session.
+ * Returns the session. <p>
*
- * <p>This simply returns what it has fetched from the connection. It is
- * up to the jms provider to typecast it and have a private API to stuff
- * messages into it.
+ * This simply returns what it has fetched from the connection. It is up to
+ * the jms provider to typecast it and have a private API to stuff messages
+ * into it.
*
- * @return The session.
+ * @return The session.
+ * @exception JMSException Description of Exception
*/
public Session getSession() throws JMSException
{
return session;
}
- /**
- * Start the session and begin consuming messages.
- *
- * @throws JMSException No listener has been specified.
- */
- public void start() throws JMSException {
- log.debug("starting invokes on server session");
-
- if (session != null) {
- try {
- serverSessionPool.getExecutor().execute(this);
- }
- catch (InterruptedException ignore) {}
- }
- else {
- throw new JMSException("No listener has been specified");
- }
- }
-
//--- Protected parts, used by other in the package
-
+
/**
- * Runs in an own thread, basically calls the session.run(), it is up
- * to the session to have been filled with messages and it will run
- * against the listener set in StdServerSessionPool. When it has send
- * all its messages it returns.
- *
- * HC: run() also starts a transaction with the TransactionManager and
- * enlists the XAResource of the JMS XASession if a XASession was
- * available. A good JMS implementation should provide the XASession
- * for use in the ASF. So we optimize for the case where we have an
- * XASession. So, for the case where we do not have an XASession and
- * the bean is not transacted, we have the unneeded overhead of creating
- * a Transaction. I'm leaving it this way since it keeps the code simpler
- * and that case should not be too common (JBossMQ provides XASessions).
+ * Runs in an own thread, basically calls the session.run(), it is up to the
+ * session to have been filled with messages and it will run against the
+ * listener set in StdServerSessionPool. When it has send all its messages it
+ * returns. HC: run() also starts a transaction with the TransactionManager
+ * and enlists the XAResource of the JMS XASession if a XASession was
+ * available. A good JMS implementation should provide the XASession for use
+ * in the ASF. So we optimize for the case where we have an XASession. So,
+ * for the case where we do not have an XASession and the bean is not
+ * transacted, we have the unneeded overhead of creating a Transaction. I'm
+ * leaving it this way since it keeps the code simpler and that case should
+ * not be too common (JBossMQ provides XASessions).
*/
- public void run() {
+ public void run()
+ {
log.debug("running...");
-
+
+ log.info("running (pool, session, xaSession, useXAResouceDirectly): " +
+ ", " + session + ", " + xaSession + ", " + useXAResouceDirectly);
+
+ // Used if run with useXAResouceDirectly if true
+ JBossMQTXInterface jbossMQTXInterface = null;
+
+ // Used if run with useXAResouceDirectly if false
Transaction trans = null;
- try {
- tm.begin();
- trans = tm.getTransaction();
-
- if (xaSession != null) {
- XAResource res = xaSession.getXAResource();
- trans.enlistResource(res);
- if (log.isDebugEnabled()) {
- log.debug("XAResource '"+res+"' enlisted.");
+ try
+ {
+
+ if (useXAResouceDirectly)
+ {
+ // Use JBossMQ One Phase Commit to commit the TX
+ jbossMQTXInterface = new JBossMQTXInterface(session);
+ jbossMQTXInterface.startTX();
+
+ }
+ else
+ {
+
+ // Use the TM to control the TX
+ tm.begin();
+ trans = tm.getTransaction();
+
+ if (xaSession != null)
+ {
+ XAResource res = xaSession.getXAResource();
+ trans.enlistResource(res);
+ if (log.isDebugEnabled())
+ {
+ log.debug("XAResource '" + res + "' enlisted.");
+ }
}
- }
+ }
+ //currentTransactionId = connection.spyXAResourceManager.startTx();
// run the session
session.run();
}
- catch (Exception e) {
+ catch (Exception e)
+ {
log.error("session failed to run; setting rollback only", e);
-
- try {
- // The transaction will be rolledback in the finally
- trans.setRollbackOnly();
- }
- catch (Exception x) {
- log.error("failed to set rollback only", x);
- }
- }
- finally {
- try {
- // Marked rollback
- if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
- log.info("Rolling back JMS transaction");
- // actually roll it back
- trans.rollback();
-
- // NO XASession? then manually rollback.
- // This is not so good but
- // it's the best we can do if we have no XASession.
- if (xaSession == null && serverSessionPool.isTransacted()) {
- session.rollback();
+
+ if (useXAResouceDirectly)
+ {
+ // Use JBossMQ One Phase Commit to commit the TX
+ jbossMQTXInterface.setRollbackOnly();
+ }
+ else
+ {
+
+ // Mark for tollback TX via TM
+ try
+ {
+ // The transaction will be rolledback in the finally
+ trans.setRollbackOnly();
+ }
+ catch (Exception x)
+ {
+ log.error("failed to set rollback only", x);
+ }
+ }
+
+ }
+ finally
+ {
+ try
+ {
+ if (useXAResouceDirectly)
+ {
+ // Use JBossMQ One Phase Commit to commit the TX
+ jbossMQTXInterface.endTX();
+
+ }
+ else
+ {
+ // Use the TM to commit the Tx
+
+ // Marked rollback
+ if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
+ {
+ log.info("Rolling back JMS transaction");
+ // actually roll it back
+ trans.rollback();
+
+ // NO XASession? then manually rollback.
+ // This is not so good but
+ // it's the best we can do if we have no XASession.
+ if (xaSession == null && serverSessionPool.isTransacted())
+ {
+ session.rollback();
+ }
}
- } else if (trans.getStatus() == Status.STATUS_ACTIVE) {
- // Commit tx
- // This will happen if
- // a) everything goes well
- // b) app. exception was thrown
- trans.commit();
-
- // NO XASession? then manually commit. This is not so good but
- // it's the best we can do if we have no XASession.
- if (xaSession == null && serverSessionPool.isTransacted()) {
- session.commit();
+ else if (trans.getStatus() == Status.STATUS_ACTIVE)
+ {
+ // Commit tx
+ // This will happen if
+ // a) everything goes well
+ // b) app. exception was thrown
+ trans.commit();
+
+ // NO XASession? then manually commit. This is not so good but
+ // it's the best we can do if we have no XASession.
+ if (xaSession == null && serverSessionPool.isTransacted())
+ {
+ session.commit();
+ }
}
}
+
}
- catch (Exception e) {
+ catch (Exception e)
+ {
log.error("failed to commit/rollback", e);
}
-
+
StdServerSession.this.recycle();
}
log.debug("done");
}
-
+
/**
- * This method is called by the ServerSessionPool when it is ready to
- * be recycled intot the pool
+ * Start the session and begin consuming messages.
+ *
+ * @throws JMSException No listener has been specified.
*/
- void recycle()
+ public void start() throws JMSException
{
- serverSessionPool.recycle(this);
+ log.debug("starting invokes on server session");
+
+ if (session != null)
+ {
+ try
+ {
+ serverSessionPool.getExecutor().execute(this);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+ else
+ {
+ throw new JMSException("No listener has been specified");
+ }
}
/**
* Called by the ServerSessionPool when the sessions should be closed.
*/
- void close() {
- if (session != null) {
- try {
+ void close()
+ {
+ if (session != null)
+ {
+ try
+ {
session.close();
- } catch (Exception ignore) {}
-
+ }
+ catch (Exception ignore)
+ {
+ }
+
session = null;
}
-
- if (xaSession != null) {
- try {
+
+ if (xaSession != null)
+ {
+ try
+ {
xaSession.close();
- } catch (Exception ignore) {}
+ }
+ catch (Exception ignore)
+ {
+ }
xaSession = null;
}
log.debug("closed");
+ }
+
+ /**
+ * This method is called by the ServerSessionPool when it is ready to be
+ * recycled intot the pool
+ */
+ void recycle()
+ {
+ serverSessionPool.recycle(this);
+ }
+
+
+ /**
+ * #Description of the Class
+ */
+ private static class JBossMQTXInterface
+ {
+
+ static boolean initialzied = false;
+ static Method getXAResourceManager;
+ static Method startTx;
+ static Method endTx;
+ static Method commit;
+ static Method rollback;
+ boolean doRollback = false;
+ Object xid = null;
+ Object spyXAResourceManager = null;
+
+ JBossMQTXInterface(Session sess) throws Exception
+ {
+ if (!initialzied)
+ {
+ getXAResourceManager =
Class.forName("org.jboss.mq.SpySession").getMethod("getXAResourceManager", new
Class[]{});
+ startTx =
Class.forName("org.jboss.mq.SpyXAResourceManager").getMethod("startTx", new Class[]{});
+ endTx =
Class.forName("org.jboss.mq.SpyXAResourceManager").getMethod("endTx", new
Class[]{Object.class, boolean.class});
+ commit =
Class.forName("org.jboss.mq.SpyXAResourceManager").getMethod("commit", new
Class[]{Object.class, boolean.class});
+ rollback =
Class.forName("org.jboss.mq.SpyXAResourceManager").getMethod("rollback", new
Class[]{Object.class});
+ initialzied = true;
+ }
+ spyXAResourceManager = getXAResourceManager.invoke(sess, new Object[]{});
+ }
+
+ void setRollbackOnly()
+ {
+ doRollback = true;
+ }
+
+ void startTX() throws Exception
+ {
+ xid = startTx.invoke(spyXAResourceManager, new Object[]{});
+ }
+
+ void endTX() throws Exception
+ {
+ if (doRollback)
+ {
+ endTx.invoke(spyXAResourceManager, new Object[]{xid, new
Boolean(true)});
+ rollback.invoke(spyXAResourceManager, new Object[]{xid});
+ }
+ else
+ {
+ endTx.invoke(spyXAResourceManager, new Object[]{xid, new
Boolean(true)});
+ commit.invoke(spyXAResourceManager, new Object[]{xid, new
Boolean(true)});
+ }
+ }
}
}
1.13 +193 -136 jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java
Index: StdServerSessionPool.java
===================================================================
RCS file: /cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- StdServerSessionPool.java 2001/08/17 22:22:55 1.12
+++ StdServerSessionPool.java 2001/09/20 05:08:21 1.13
@@ -1,128 +1,131 @@
/*
- * Copyright (c) 2000 Peter Antman Tim <[EMAIL PROTECTED]>
+ * JBoss, the OpenSource J2EE webOS
*
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
*/
package org.jboss.jms.asf;
+import EDU.oswego.cs.dl.util.concurrent.Executor;
-import java.util.List;
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
+
import javax.jms.Connection;
import javax.jms.JMSException;
+import javax.jms.MessageListener;
+import javax.jms.QueueConnection;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
-import javax.jms.MessageListener;
+import javax.jms.Session;
import javax.jms.TopicConnection;
-import javax.jms.XATopicConnection;
-import javax.jms.QueueConnection;
import javax.jms.XAQueueConnection;
-import javax.jms.Session;
-import javax.jms.XASession;
import javax.jms.XAQueueSession;
+import javax.jms.XASession;
+import javax.jms.XATopicConnection;
import javax.jms.XATopicSession;
-import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
-import EDU.oswego.cs.dl.util.concurrent.Executor;
-import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
-
import org.apache.log4j.Category;
/**
- * Implementation of ServerSessionPool.
+ * Implementation of ServerSessionPool. <p>
*
- * <p>Created: Thu Dec 7 17:02:03 2000
+ * Created: Thu Dec 7 17:02:03 2000
*
- * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a>.
- * @version $Revision: 1.12 $
+ * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a> .
+ * @version $Revision: 1.13 $
*/
public class StdServerSessionPool
- implements ServerSessionPool
+ implements ServerSessionPool
{
- /** The default size of the pool. */
- private static final int DEFAULT_POOL_SIZE = 15;
+ /**
+ * The default size of the pool.
+ */
+ private final static int DEFAULT_POOL_SIZE = 15;
- /** The thread group which session workers will run. */
+ /**
+ * The thread group which session workers will run.
+ */
private static ThreadGroup threadGroup =
- new ThreadGroup("ASF Session Pool Threads");
+ new ThreadGroup("ASF Session Pool Threads");
- /** Instance logger. */
+ /**
+ * Instance logger.
+ */
private final Category log = Category.getInstance(this.getClass());
- /** The size of the pool. */
+ /**
+ * The size of the pool.
+ */
private int poolSize;
- /** The message acknowledgment mode. */
+ /**
+ * The message acknowledgment mode.
+ */
private int ack;
+
+ /**
+ * Is the bean container managed?
+ */
+ private boolean containerManaged;
- /** True if this is a transacted session. */
+ /**
+ * True if this is a transacted session.
+ */
private boolean transacted;
- /** The session connection. */
+ /**
+ * The session connection.
+ */
private Connection con;
- /** The message listener for the session. */
+ /**
+ * The message listener for the session.
+ */
private MessageListener listener;
- /** The list of ServerSessions. */
+ /**
+ * The list of ServerSessions.
+ */
private List sessionPool;
- /** The executor for processing messages? */
+ /**
+ * The executor for processing messages?
+ */
private PooledExecutor executor;
- /** Used to signal when the Pool is being closed down */
+ /**
+ * Used to signal when the Pool is being closed down
+ */
private boolean closing = false;
- /** Used during close down to wait for all server sessions to be returned and
closed.*/
- private int numServerSessions = 0;
-
/**
- * Construct a <tt>StdServerSessionPool</tt> using the default
- * pool size.
- *
- * @param con
- * @param transacted
- * @param ack
- * @param listener
+ * Used during close down to wait for all server sessions to be returned and
+ * closed.
*/
- public StdServerSessionPool(final Connection con,
- final boolean transacted,
- final int ack,
- final MessageListener listener)
- throws JMSException
- {
- this(con, transacted, ack, listener, DEFAULT_POOL_SIZE);
- }
+ private int numServerSessions = 0;
/**
- * Construct a <tt>StdServerSessionPool</tt> using the default
- * pool size.
+ * Construct a <tt>StdServerSessionPool</tt> using the default pool size.
*
* @param con
* @param transacted
* @param ack
* @param listener
* @param maxSession
+ * @param isContainerManaged Description of Parameter
+ * @exception JMSException Description of Exception
*/
public StdServerSessionPool(final Connection con,
- final boolean transacted,
- final int ack,
- final MessageListener listener,
- final int maxSession)
- throws JMSException
+ final boolean transacted,
+ final int ack,
+ final boolean isContainerManaged,
+ final MessageListener listener,
+ final int maxSession)
+ throws JMSException
{
this.con = con;
this.ack = ack;
@@ -130,19 +133,29 @@
this.transacted = transacted;
this.poolSize = maxSession;
this.sessionPool = new ArrayList(maxSession);
+ this.containerManaged = isContainerManaged;
// setup the worker pool
executor = new PooledExecutor(poolSize);
executor.setMinimumPoolSize(0);
- executor.setKeepAliveTime(1000*30);
+ executor.setKeepAliveTime(1000 * 30);
executor.waitWhenBlocked();
- executor.setThreadFactory(new ThreadFactory() {
+ executor.setThreadFactory(
+ new ThreadFactory()
+ {
private volatile int count = 0;
-
- public Thread newThread(final Runnable command) {
+
+ /**
+ * #Description of the Method
+ *
+ * @param command Description of Parameter
+ * @return Description of the Returned Value
+ */
+ public Thread newThread(final Runnable command)
+ {
return new Thread(threadGroup,
- command,
- "Thread Pool Worker-" + count++);
+ command,
+ "Thread Pool Worker-" + count++);
}
});
@@ -155,36 +168,45 @@
/**
* Get a server session.
- *
- * @return A server session.
*
- * @throws JMSException Failed to get a server session.
+ * @return A server session.
+ * @throws JMSException Failed to get a server session.
*/
public ServerSession getServerSession() throws JMSException
{
log.debug("getting a server session");
ServerSession session = null;
- try {
- while (true) {
- synchronized (sessionPool) {
- if(closing){
+ try
+ {
+ while (true)
+ {
+ synchronized (sessionPool)
+ {
+ if (closing)
+ {
throw new JMSException("Cannot get session after pool has been
closed down.");
}
- else if (sessionPool.size() > 0) {
+ else if (sessionPool.size() > 0)
+ {
session = (ServerSession)sessionPool.remove(0);
break;
}
- else {
- try {
+ else
+ {
+ try
+ {
sessionPool.wait();
}
- catch (InterruptedException ignore) {}
+ catch (InterruptedException ignore)
+ {
+ }
}
}
}
}
- catch (Exception e) {
+ catch (Exception e)
+ {
throw new JMSException("Failed to get a server session: " + e);
}
@@ -194,59 +216,29 @@
return session;
}
- // --- Protected messages for StdServerSession to use
-
/**
- * Returns true if this server session is transacted.
- */
- boolean isTransacted() {
- return transacted;
- }
-
- /**
- * Recycle a server session.
- */
- void recycle(StdServerSession session) {
- synchronized (sessionPool) {
- if(closing){
- session.close();
- numServerSessions--;
- if(numServerSessions == 0) //notify clear thread.
- sessionPool.notifyAll();
- }else{
- sessionPool.add(session);
- sessionPool.notifyAll();
- log.debug("recycled server session: " + session);
- }
- }
- }
-
- /**
- * Get the executor we are using.
- */
- Executor getExecutor() {
- return executor;
- }
-
- /**
* Clear the pool, clear out both threads and ServerSessions,
* connection.stop() should be run before this method.
*/
- public void clear() {
- synchronized (sessionPool) {
+ public void clear()
+ {
+ synchronized (sessionPool)
+ {
// FIXME - is there a runaway condition here. What if a
// ServerSession are taken by a ConnecionConsumer? Should we set
// a flag somehow so that no ServerSessions are recycled and the
// ThreadPool won't leave any more threads out.
closing = true;
- if (log.isDebugEnabled()) {
+ if (log.isDebugEnabled())
+ {
log.debug("Clearing " + sessionPool.size() +
- " from ServerSessionPool");
+ " from ServerSessionPool");
}
Iterator iter = sessionPool.iterator();
- while (iter.hasNext()) {
+ while (iter.hasNext())
+ {
StdServerSession ses = (StdServerSession)iter.next();
// Should we do anything to the server session?
ses.close();
@@ -261,9 +253,68 @@
executor.shutdownAfterProcessingCurrentlyQueuedTasks();
//wait for all server sessions to be returned.
- synchronized(sessionPool){
- while(numServerSessions > 0)
- try{ sessionPool.wait(); }catch(InterruptedException ignore){}
+ synchronized (sessionPool)
+ {
+ while (numServerSessions > 0)
+ {
+ try
+ {
+ sessionPool.wait();
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+ }
+ }
+
+ /**
+ * Get the executor we are using.
+ *
+ * @return The Executor value
+ */
+ Executor getExecutor()
+ {
+ return executor;
+ }
+
+ // --- Protected messages for StdServerSession to use
+
+ /**
+ * Returns true if this server session is transacted.
+ *
+ * @return The Transacted value
+ */
+ boolean isTransacted()
+ {
+ return transacted;
+ }
+
+ /**
+ * Recycle a server session.
+ *
+ * @param session Description of Parameter
+ */
+ void recycle(StdServerSession session)
+ {
+ synchronized (sessionPool)
+ {
+ if (closing)
+ {
+ session.close();
+ numServerSessions--;
+ if (numServerSessions == 0)
+ {
+ //notify clear thread.
+ sessionPool.notifyAll();
+ }
+ }
+ else
+ {
+ sessionPool.add(session);
+ sessionPool.notifyAll();
+ log.debug("recycled server session: " + session);
+ }
}
}
@@ -271,32 +322,38 @@
private void init() throws JMSException
{
- for (int index = 0; index < poolSize; index++) {
+ for (int index = 0; index < poolSize; index++)
+ {
// Here is the meat, that MUST follow the spec
Session ses = null;
XASession xaSes = null;
log.debug("initializing with connection: " + con);
- if (con instanceof XATopicConnection) {
+ if (con instanceof XATopicConnection)
+ {
xaSes = ((XATopicConnection)con).createXATopicSession();
ses = ((XATopicSession)xaSes).getTopicSession();
}
- else if (con instanceof XAQueueConnection) {
+ else if (con instanceof XAQueueConnection)
+ {
xaSes = ((XAQueueConnection)con).createXAQueueSession();
ses = ((XAQueueSession)xaSes).getQueueSession();
}
- else if (con instanceof TopicConnection) {
+ else if (con instanceof TopicConnection)
+ {
ses = ((TopicConnection)con).createTopicSession(transacted, ack);
log.warn("Using a non-XA TopicConnection. " +
- "It will not be able to participate in a Global UOW");
+ "It will not be able to participate in a Global UOW");
}
- else if (con instanceof QueueConnection) {
+ else if (con instanceof QueueConnection)
+ {
ses = ((QueueConnection)con).createQueueSession(transacted, ack);
log.warn("Using a non-XA QueueConnection. " +
- "It will not be able to participate in a Global UOW");
+ "It will not be able to participate in a Global UOW");
}
- else {
+ else
+ {
// should never happen really
log.error("Connection was not reconizable: " + con);
throw new JMSException("Connection was not reconizable: " + con);
@@ -308,7 +365,7 @@
ses.setMessageListener(listener);
// create the server session and add it to the pool
- ServerSession serverSession = new StdServerSession(this, ses, xaSes);
+ ServerSession serverSession = new StdServerSession(this, ses, xaSes,
containerManaged);
sessionPool.add(serverSession);
numServerSessions++;
log.debug("added server session to the pool: " + serverSession);
1.6 +30 -45
jboss/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java
Index: StdServerSessionPoolFactory.java
===================================================================
RCS file:
/cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- StdServerSessionPoolFactory.java 2001/07/21 04:18:28 1.5
+++ StdServerSessionPoolFactory.java 2001/09/20 05:08:21 1.6
@@ -1,93 +1,78 @@
/*
- * Copyright (c) 2000 Peter Antman DN <[EMAIL PROTECTED]>
+ * JBoss, the OpenSource J2EE webOS
*
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
*/
package org.jboss.jms.asf;
import java.io.Serializable;
-
-import javax.jms.ServerSessionPool;
-import javax.jms.MessageListener;
import javax.jms.Connection;
import javax.jms.JMSException;
+import javax.jms.MessageListener;
+import javax.jms.ServerSessionPool;
+
/**
- * An implementation of ServerSessionPoolFactory.
+ * An implementation of ServerSessionPoolFactory. <p>
*
- * <p>Created: Fri Dec 22 09:47:41 2000
+ * Created: Fri Dec 22 09:47:41 2000
*
- * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a>.
- * @version $Revision: 1.5 $
+ * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a> .
+ * @version $Revision: 1.6 $
*/
public class StdServerSessionPoolFactory
- implements ServerSessionPoolFactory, Serializable
+ implements ServerSessionPoolFactory, Serializable
{
- /** The name of this factory. */
+ /**
+ * The name of this factory.
+ */
private String name;
/**
- * Construct a <tt>StdServerSessionPoolFactory</tt>.
+ * Construct a <tt>StdServerSessionPoolFactory</tt> .
*/
- public StdServerSessionPoolFactory() {
+ public StdServerSessionPoolFactory()
+ {
super();
}
/**
* Set the name of the factory.
*
- * @param name The name of the factory.
+ * @param name The name of the factory.
*/
- public void setName(final String name) {
+ public void setName(final String name)
+ {
this.name = name;
}
-
+
/**
* Get the name of the factory.
*
- * @return The name of the factory.
+ * @return The name of the factory.
*/
- public String getName() {
+ public String getName()
+ {
return name;
}
/**
- * Create a new <tt>ServerSessionPool</tt>.
+ * Create a new <tt>ServerSessionPool</tt> .
*
* @param con
* @param maxSession
* @param isTransacted
* @param ack
* @param listener
- * @return A new pool.
- *
+ * @param isContainerManaged Description of Parameter
+ * @return A new pool.
* @throws JMSException
+ * @exception javax.jms.JMSException Description of Exception
*/
- public ServerSessionPool getServerSessionPool(final Connection con,
- final int maxSession,
- final boolean isTransacted,
- final int ack,
- final MessageListener listener)
- throws JMSException
+ public javax.jms.ServerSessionPool getServerSessionPool(javax.jms.Connection
con, int maxSession, boolean isTransacted, int ack, boolean isContainerManaged,
javax.jms.MessageListener listener) throws javax.jms.JMSException
{
- ServerSessionPool pool = (ServerSessionPool)
- new StdServerSessionPool(con,
- isTransacted,
- ack,
- listener,
- maxSession);
+ ServerSessionPool pool = (ServerSessionPool)new StdServerSessionPool(con,
isTransacted, ack, isContainerManaged, listener, maxSession);
return pool;
}
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development