Author: asankha Date: Fri Nov 21 09:34:18 2008 New Revision: 719648 URL: http://svn.apache.org/viewvc?rev=719648&view=rev Log: fix connection, session and consumer caching as pointed out by andreas fix problem in minconcurrency test for proper operation
Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java webservices/commons/trunk/scratch/asankha/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java webservices/commons/trunk/scratch/asankha/transport/modules/tests/log4j.properties webservices/commons/trunk/scratch/asankha/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java?rev=719648&r1=719647&r2=719648&view=diff ============================================================================== --- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java (original) +++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java Fri Nov 21 09:34:18 2008 @@ -132,15 +132,9 @@ contentTypeProperty = jmsOut.getContentTypeProperty(); } - if (messageSender.getCacheLevel() < JMSConstants.CACHE_SESSION) { - // only connection has been cached at most + // need to synchronize as Sessions are not thread safe + synchronized (messageSender.getSession()) { sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut); - - } else { - // need to synchronize as Sessions are not thread safe - synchronized (messageSender.getSession()) { - sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut); - } } } Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java?rev=719648&r1=719647&r2=719648&view=diff ============================================================================== --- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java (original) +++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java Fri Nov 21 09:34:18 2008 @@ -48,7 +48,7 @@ * to re-connect. Thus a connection failure for a single task, will re-initialize the state afresh * for the service, by discarding all connections. */ -public class ServiceTaskManager implements ExceptionListener { +public class ServiceTaskManager { /** The logger */ private static final Log log = LogFactory.getLog(ServiceTaskManager.class); @@ -122,12 +122,6 @@ private Context context = null; /** The ConnectionFactory to be used */ private ConnectionFactory conFactory = null; - /** The shared JMS Connection opened */ - private Connection sharedConnection = null; - /** The shared JMS Connection opened */ - private Session sharedSession = null; - /** The shared JMS Connection opened */ - private MessageConsumer sharedConsumer = null; /** The JMS Destination */ private Destination destination = null; @@ -144,41 +138,6 @@ /** The shared thread pool from the Listener */ private WorkerPool workerPool = null; - /** Handle JMS Connection exceptions by re-initializing. A single connection failure could - * cause re-initialization of multiple MessageListenerTasks / Connections - */ - public void onException(JMSException j) { - - if (!isActive()) { - return; - } - - // if we failed while active, update state to show failure - state = STATE_FAILURE; - log.error("JMS Connection failed : " + j.getMessage() + " - shutting down worker tasks", j); - - int r = 1; - long retryDuration = initialReconnectDuration; - - do { - try { - log.info("Reconnection attempt : " + r + " for service : " + serviceName); - start(); - } catch (Exception e) { - log.error("Reconnection attempt : " + (r++) + " for service : " + serviceName + - " failed. Next retry in " + (retryDuration/1000) + "seconds", e); - retryDuration = (long) (retryDuration * reconnectionProgressionFactor); - if (retryDuration > maxReconnectDuration) { - retryDuration = maxReconnectDuration; - } - - try { - Thread.sleep(retryDuration); - } catch (InterruptedException ignore) {} - } - } while (!isActive()); - } - /** * Start or re-start the Task Manager by shutting down any existing worker tasks and * re-creating them. However, if this is STM is PAUSED, a start request is ignored. @@ -208,16 +167,16 @@ "worker tasks of service : " + serviceName); break; case JMSConstants.CACHE_CONNECTION: - log.debug("Only the JMS Connection will be cached and shared between poller " + - "tasks of service : " + serviceName); + log.debug("Only the JMS Connection will be cached and shared between successive " + + "poller task invocations"); break; case JMSConstants.CACHE_SESSION: log.debug("The JMS Connection and Session will be cached and shared between " + - "poller tasks of service : " + serviceName); + "successive poller task invocations"); break; case JMSConstants.CACHE_CONSUMER: log.debug("The JMS Connection, Session and MessageConsumer will be cached and " + - "shared between poller tasks of service : " + serviceName); + "shared between successive poller task invocations"); break; default : { handleException("Invalid cache level : " + cacheLevel + @@ -266,42 +225,6 @@ log.warn("Unable to shutdown all polling tasks of service : " + serviceName); } - if (sharedConsumer != null) { - log.debug("Closing shared Consumer - service : " + serviceName); - try { - sharedConsumer.close(); - } catch (IllegalStateException ignore) { - } catch (JMSException e) { - logError("Error closing shared JMS consumer", e); - } finally { - sharedConsumer = null; - } - } - - if (sharedSession != null) { - log.debug("Closing shared Session - service : " + serviceName); - try { - sharedSession.close(); - } catch (IllegalStateException ignore) { - } catch (JMSException e) { - logError("Error closing shared JMS session", e); - } finally { - sharedSession = null; - } - } - - if (sharedConnection != null) { - log.debug("Closing shared Connection - service : " + serviceName); - try { - sharedConnection.close(); - } catch (IllegalStateException ignore) { - } catch (JMSException e) { - logError("Error closing shared JMS connection", e); - } finally { - sharedConnection = null; - } - } - if (state != STATE_FAILURE) { state = STATE_STOPPED; } @@ -316,14 +239,6 @@ for (MessageListenerTask lstTask : pollingTasks) { lstTask.pause(); } - - if (sharedConnection != null) { - try { - sharedConnection.stop(); - } catch (JMSException e) { - logError("Error pausing shared JMS connection", e); - } - } } /** @@ -333,14 +248,6 @@ for (MessageListenerTask lstTask : pollingTasks) { lstTask.resume(); } - - if (sharedConnection != null) { - try { - sharedConnection.start(); - } catch (JMSException e) { - logError("Error pausing shared JMS connection", e); - } - } } /** @@ -371,7 +278,7 @@ /** * The actual threads/tasks that perform message polling */ - private class MessageListenerTask implements Runnable { + private class MessageListenerTask implements Runnable, ExceptionListener { /** The Connection used by the polling task */ private Connection connection = null; @@ -398,7 +305,7 @@ */ public void pause() { if (isActive()) { - if (connection != null && connection != sharedConnection) { + if (connection != null) { try { connection.stop(); } catch (JMSException e) { @@ -413,7 +320,7 @@ * Resume this polling task */ public void resume() { - if (connection != null && connection != sharedConnection) { + if (connection != null) { try { connection.start(); } catch (JMSException e) { @@ -493,17 +400,16 @@ " is stopping after processing : " + messageCount + " messages"); } + closeConsumer(true); + closeSession(true); + closeConnection(true); + activeTaskCount--; synchronized(pollingTasks) { pollingTasks.remove(this); } // My time is up, so if I am going away, create another scheduleNewTaskIfAppropriate(); - - // close any non-shared resources - closeConsumer(consumer); - closeSession(session); - closeConnection(connection); } /** @@ -515,10 +421,10 @@ // get a new connection, session and consumer to prevent a conflict. // If idle, it means we can re-use what we already have - if (!idle) { + if (consumer == null) { connection = getConnection(); - session = getSession(connection); - consumer = getMessageConsumer(connection, session); + session = getSession(); + consumer = getMessageConsumer(); if (log.isDebugEnabled()) { log.debug("Preparing a Connection, Session and Consumer to read messages"); } @@ -575,7 +481,7 @@ } // close the consumer - closeConsumer(consumer); + closeConsumer(false); // if session was transacted, commit it or rollback try { @@ -617,9 +523,44 @@ " JTA txn for message : " + messageId + " from the session", e); } - closeSession(session); - closeConnection(connection); + closeSession(false); + closeConnection(false); + } + } + + /** Handle JMS Connection exceptions by re-initializing. A single connection failure could + * cause re-initialization of multiple MessageListenerTasks / Connections + */ + public void onException(JMSException j) { + + if (!isSTMActive()) { + return; } + + // if we failed while active, update state to show failure + setState(STATE_FAILURE); + log.error("JMS Connection failed : " + j.getMessage() + " - shutting down worker tasks", j); + + int r = 1; + long retryDuration = initialReconnectDuration; + + do { + try { + log.info("Reconnection attempt : " + r + " for service : " + serviceName); + start(); + } catch (Exception e) { + log.error("Reconnection attempt : " + (r++) + " for service : " + serviceName + + " failed. Next retry in " + (retryDuration/1000) + "seconds", e); + retryDuration = (long) (retryDuration * reconnectionProgressionFactor); + if (retryDuration > maxReconnectDuration) { + retryDuration = maxReconnectDuration; + } + + try { + Thread.sleep(retryDuration); + } catch (InterruptedException ignore) {} + } + } while (!isSTMActive()); } protected void requestShutdown() { @@ -633,151 +574,184 @@ protected boolean isTaskIdle() { return idle; } - } - // -------------- mundane private methods ---------------- - /** - * Close the given Connection, hiding exceptions if any which are logged - * @param connection the Connection to be closed - */ - private void closeConnection(Connection connection) { - try { - if (connection != null && connection != sharedConnection) { - if (log.isDebugEnabled()) { - log.debug("Closing non-shared JMS connection for service : " + serviceName); - } - connection.close(); + /** + * Get a Connection that could/should be used by this task - depends on the cache level to reuse + * @return the shared Connection if cache level is higher than CACHE_NONE, or a new Connection + */ + private Connection getConnection() { + if (connection == null || cacheLevel < JMSConstants.CACHE_CONNECTION) { + connection = createConnection(); } - } catch (JMSException e) { - logError("Error closing JMS connection", e); - } finally { - connection = null; + return connection; } - } - /** - * Close the given Session, hiding exceptions if any which are logged - * @param session the Session to be closed - */ - private void closeSession(Session session) { - try { - if (session != null && session != sharedSession) { - if (log.isDebugEnabled()) { - log.debug("Closing non-shared JMS session for service : " + serviceName); - } - session.close(); + /** + * Get a Session that could/should be used by this task - depends on the cache level to reuse + * @param connection the connection (could be the shared connection) to use to create a Session + * @return the shared Session if cache level is higher than CACHE_CONNECTION, or a new Session + * created using the Connection passed, or a new/shared connection + */ + private Session getSession() { + if (session == null || cacheLevel < JMSConstants.CACHE_SESSION) { + session = createSession(); } - } catch (JMSException e) { - logError("Error closing JMS session", e); - } finally { - session = null; + return session; } - } - /** - * Close the given Consumer, hiding exceptions if any which are logged - * @param consumer the Consumer to be closed - */ - private void closeConsumer(MessageConsumer consumer) { - try { - if (consumer != null && consumer != sharedConsumer) { - if (log.isDebugEnabled()) { - log.debug("Closing non-shared JMS consumer for service : " + serviceName); - } - consumer.close(); - consumer = null; + /** + * Get a MessageConsumer that chould/should be used by this task - depends on the cache + * level to reuse + * @param connection option Connection to be used + * @param session optional Session to be used + * @return the shared MessageConsumer if cache level is higher than CACHE_SESSION, or a new + * MessageConsumer possibly using the Connection and Session passed in + */ + private MessageConsumer getMessageConsumer() { + if (consumer == null || cacheLevel < JMSConstants.CACHE_CONSUMER) { + consumer = createConsumer(); } - } catch (JMSException e) { - logError("Error closing JMS consumer", e); - } finally { - consumer = null; + return consumer; } - } - /** - * Get a Connection that could/should be used by this STM - depends on the cache level to reuse - * @return the shared Connection if cache level is higher than CACHE_NONE, or a new Connection - */ - private Connection getConnection() { - if (cacheLevel > JMSConstants.CACHE_NONE) { - return getSharedConnection(); - } else { - return createConnection(); + /** + * Close the given Connection, hiding exceptions if any which are logged + * @param connection the Connection to be closed + */ + private void closeConnection(boolean forced) { + if (connection != null && + (cacheLevel < JMSConstants.CACHE_CONNECTION || forced)) { + try { + if (log.isDebugEnabled()) { + log.debug("Closing non-shared JMS connection for service : " + serviceName); + } + connection.close(); + } catch (JMSException e) { + logError("Error closing JMS connection", e); + } finally { + connection = null; + } + } } - } - /** - * Get a Session that could/should be used by this STM - depends on the cache level to reuse - * @param connection the connection (could be the shared connection) to use to create a Session - * @return the shared Session if cache level is higher than CACHE_CONNECTION, or a new Session - * created using the Connection passed, or a new/shared connection - */ - private Session getSession(Connection connection) { - if (cacheLevel > JMSConstants.CACHE_CONNECTION) { - return getSharedSession(); - } else { - return createSession((connection == null ? getConnection() : connection)); + /** + * Close the given Session, hiding exceptions if any which are logged + * @param session the Session to be closed + */ + private void closeSession(boolean forced) { + if (session != null && + (cacheLevel < JMSConstants.CACHE_SESSION || forced)) { + try { + if (log.isDebugEnabled()) { + log.debug("Closing non-shared JMS session for service : " + serviceName); + } + session.close(); + } catch (JMSException e) { + logError("Error closing JMS session", e); + } finally { + session = null; + } + } } - } - /** - * Get a MessageConsumer that chould/should be used by this STM - depends on the cache level to - * reuse - * @param connection option Connection to be used - * @param session optional Session to be used - * @return the shared MessageConsumer if cache level is higher than CACHE_SESSION, or a new - * MessageConsumer possibly using the Connection and Session passed in - */ - private MessageConsumer getMessageConsumer(Connection connection, Session session) { - if (cacheLevel > JMSConstants.CACHE_SESSION) { - return getSharedConsumer(); - } else { - return createConsumer((session == null ? getSession(connection) : session)); + /** + * Close the given Consumer, hiding exceptions if any which are logged + * @param consumer the Consumer to be closed + */ + private void closeConsumer(boolean forced) { + if (consumer != null && + (cacheLevel < JMSConstants.CACHE_CONSUMER || forced)) { + try { + if (log.isDebugEnabled()) { + log.debug("Closing non-shared JMS consumer for service : " + serviceName); + } + consumer.close(); + } catch (JMSException e) { + logError("Error closing JMS consumer", e); + } finally { + consumer = null; + } + } } - } - /** - * Get the shared Connection for this STM - * @return shared Connection for the STM - */ - private synchronized Connection getSharedConnection() { - if (sharedConnection == null) { - sharedConnection = createConnection(); - if (log.isDebugEnabled()) { - log.debug("Created shared JMS Connection for service : " + serviceName); + /** + * Create a new Connection for this STM, using JNDI properties and credentials provided + * @return a new Connection for this STM, using JNDI properties and credentials provided + */ + private Connection createConnection() { + + try { + conFactory = JMSUtils.lookup( + getInitialContext(), ConnectionFactory.class, getConnFactoryJNDIName()); + log.info("Connected to the JMS connection factory : " + getConnFactoryJNDIName()); + } catch (NamingException e) { + handleException("Error looking up connection factory : " + getConnFactoryJNDIName() + + " using JNDI properties : " + jndiProperties, e); + } + + Connection connection = null; + try { + connection = JMSUtils.createConnection( + conFactory, + jndiProperties.get(Context.SECURITY_PRINCIPAL), + jndiProperties.get(Context.SECURITY_CREDENTIALS), + isJmsSpec11(), isQueue()); + + connection.setExceptionListener(this); + connection.start(); + log.info("JMS Connection for service : " + serviceName + " created and started"); + + } catch (JMSException e) { + handleException("Error acquiring a JMS connection to : " + getConnFactoryJNDIName() + + " using JNDI properties : " + jndiProperties, e); } + return connection; } - return sharedConnection; - } - /** - * Get the shared Session for the STM - * @return shared Session for the STM - */ - private synchronized Session getSharedSession() { - if (sharedSession == null) { - sharedSession = createSession(getSharedConnection()); - if (log.isDebugEnabled()) { - log.debug("Created shared JMS Session for service : " + serviceName); + /** + * Create a new Session for this STM + * @param connection the Connection to be used + * @return a new Session created using the Connection passed in + */ + private Session createSession() { + try { + if (log.isDebugEnabled()) { + log.debug("Creating a new JMS Session for service : " + serviceName); + } + return JMSUtils.createSession( + connection, isSessionTransacted(), getSessionAckMode(), isJmsSpec11(), isQueue()); + + } catch (JMSException e) { + handleException("Error creating JMS session for service : " + serviceName, e); } + return null; } - return sharedSession; - } - /** - * Get the shared MessageConsumer for the STM - * @return shared MessageConsumer for the STM - */ - private synchronized MessageConsumer getSharedConsumer() { - if (sharedConsumer == null) { - sharedConsumer = createConsumer(getSharedSession()); - if (log.isDebugEnabled()) { - log.debug("Created shared JMS MessageConsumer for service : " + serviceName); + /** + * Create a new MessageConsumer for this STM + * @param session the Session to be used + * @return a new MessageConsumer created using the Session passed in + */ + private MessageConsumer createConsumer() { + try { + if (log.isDebugEnabled()) { + log.debug("Creating a new JMS MessageConsumer for service : " + serviceName); + } + + return JMSUtils.createConsumer( + session, getDestination(), isQueue(), + (isSubscriptionDurable() && getDurableSubscriberName() == null ? + getDurableSubscriberName() : serviceName), + getMessageSelector(), isPubSubNoLocal(), isSubscriptionDurable(), isJmsSpec11()); + + } catch (JMSException e) { + handleException("Error creating JMS consumer for service : " + serviceName,e); } + return null; } - return sharedConsumer; } + // -------------- mundane private methods ---------------- /** * Get the InitialContext for lookup using the JNDI parameters applicable to the service * @return the InitialContext to be used @@ -800,7 +774,7 @@ context = getInitialContext(); destination = JMSUtils.lookup(context, Destination.class, getDestinationJNDIName()); if (log.isDebugEnabled()) { - log.debug("JMS Destionation with JNDI name : " + getDestinationJNDIName() + + log.debug("JMS Destination with JNDI name : " + getDestinationJNDIName() + " found for service " + serviceName); } } catch (NamingException e) { @@ -847,85 +821,8 @@ return sharedUserTransaction; } - /** - * Create a new Connection for this STM, using JNDI properties and credentials provided - * @return a new Connection for this STM, using JNDI properties and credentials provided - */ - private Connection createConnection() { - - try { - conFactory = JMSUtils.lookup( - getInitialContext(), ConnectionFactory.class, getConnFactoryJNDIName()); - log.info("Connected to the JMS connection factory : " + getConnFactoryJNDIName()); - } catch (NamingException e) { - handleException("Error looking up connection factory : " + getConnFactoryJNDIName() + - " using JNDI properties : " + jndiProperties, e); - } - - Connection connection = null; - try { - connection = JMSUtils.createConnection( - conFactory, - jndiProperties.get(Context.SECURITY_PRINCIPAL), - jndiProperties.get(Context.SECURITY_CREDENTIALS), - isJmsSpec11(), isQueue()); - - connection.setExceptionListener(this); - connection.start(); - log.info("JMS Connection for service : " + serviceName + " created and started"); - - } catch (JMSException e) { - handleException("Error acquiring a JMS connection to : " + getConnFactoryJNDIName() + - " using JNDI properties : " + jndiProperties, e); - } - return connection; - } - - /** - * Create a new Session for this STM - * @param connection the Connection to be used - * @return a new Session created using the Connection passed in - */ - private Session createSession(Connection connection) { - try { - if (log.isDebugEnabled()) { - log.debug("Creating a new JMS Session for service : " + serviceName); - } - return JMSUtils.createSession( - connection, isSessionTransacted(), getSessionAckMode(), isJmsSpec11(), isQueue()); - - } catch (JMSException e) { - handleException("Error creating JMS session for service : " + serviceName, e); - } - return null; - } - - /** - * Create a new MessageConsumer for this STM - * @param session the Session to be used - * @return a new MessageConsumer created using the Session passed in - */ - private MessageConsumer createConsumer(Session session) { - try { - if (log.isDebugEnabled()) { - log.debug("Creating a new JMS MessageConsumer for service : " + serviceName); - } - - return JMSUtils.createConsumer( - session, getDestination(), isQueue(), - (isSubscriptionDurable() && getDurableSubscriberName() == null ? - getDurableSubscriberName() : serviceName), - getMessageSelector(), isPubSubNoLocal(), isSubscriptionDurable(), isJmsSpec11()); - - } catch (JMSException e) { - handleException("Error creating JMS consumer for service : " + serviceName,e); - } - return null; - } - - // -------------------- trivial methods --------------------- - private boolean isActive() { + private boolean isSTMActive() { return state == STATE_STARTED; } @@ -1188,6 +1085,10 @@ return activeTaskCount; } + public void setState(int state) { + this.state = state; + } + //--------------------- used for development testing--------------------------- /*public static void main(String[] args) throws Exception { //org.apache.log4j.BasicConfigurator.configure(); Modified: webservices/commons/trunk/scratch/asankha/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java?rev=719648&r1=719647&r2=719648&view=diff ============================================================================== --- webservices/commons/trunk/scratch/asankha/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java (original) +++ webservices/commons/trunk/scratch/asankha/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java Fri Nov 21 09:34:18 2008 @@ -71,20 +71,24 @@ this.messages = messages; this.preloadMessages = preloadMessages; } + + private int concurrencyReached; + private final Object concurrencyReachedLock = new Object(); + private final Object shutdownAwaitLock = new Object(); @Override protected void runTest() throws Throwable { int endpointCount = channels.length; int expectedConcurrency = endpointCount * messages; - final CountDownLatch shutdownLatch = new CountDownLatch(1); - final CountDownLatch concurrencyReachedLatch = new CountDownLatch(expectedConcurrency); - final MessageReceiver messageReceiver = new MessageReceiver() { public void receive(MessageContext msgContext) throws AxisFault { - concurrencyReachedLatch.countDown(); + synchronized (concurrencyReachedLock) { + concurrencyReached++; + concurrencyReachedLock.notifyAll(); + } try { - shutdownLatch.await(); + shutdownAwaitLock.wait(); } catch (InterruptedException ex) { } } @@ -135,14 +139,25 @@ endpointResourceSets[i] = endpointResourceSet; } } - - if (!concurrencyReachedLatch.await(5, TimeUnit.SECONDS)) { - fail("Concurrency reached is " + (expectedConcurrency - - concurrencyReachedLatch.getCount()) + ", but expected " + - expectedConcurrency); + + long startTime = System.currentTimeMillis(); + while (concurrencyReached < expectedConcurrency + && System.currentTimeMillis() < (startTime + 5000)) { + synchronized(concurrencyReachedLock) { + concurrencyReachedLock.wait(); + } + } + + synchronized(shutdownAwaitLock) { + shutdownAwaitLock.notifyAll(); } + + if (concurrencyReached < expectedConcurrency) { + fail("Concurrency reached is " + concurrencyReached + ", but expected " + + expectedConcurrency); + } + } finally { - shutdownLatch.countDown(); for (int i=0; i<endpointCount; i++) { if (endpointResourceSets[i] != null) { endpointResourceSets[i].tearDown(); Modified: webservices/commons/trunk/scratch/asankha/transport/modules/tests/log4j.properties URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/tests/log4j.properties?rev=719648&r1=719647&r2=719648&view=diff ============================================================================== --- webservices/commons/trunk/scratch/asankha/transport/modules/tests/log4j.properties (original) +++ webservices/commons/trunk/scratch/asankha/transport/modules/tests/log4j.properties Fri Nov 21 09:34:18 2008 @@ -20,10 +20,12 @@ # log4j configuration file used by unit tests log4j.rootCategory=DEBUG, CONSOLE +#log4j.rootCategory=WARN, CONSOLE log4j.category.org.apache.axis2.transport.jms=TRACE log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender log4j.appender.CONSOLE.threshold=ERROR +#log4j.appender.CONSOLE.threshold=TRACE log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout log4j.appender.CONSOLE.layout.ConversionPattern=%5p [%t] %c{1} %m%n Modified: webservices/commons/trunk/scratch/asankha/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java?rev=719648&r1=719647&r2=719648&view=diff ============================================================================== --- webservices/commons/trunk/scratch/asankha/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java (original) +++ webservices/commons/trunk/scratch/asankha/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java Fri Nov 21 09:34:18 2008 @@ -53,6 +53,7 @@ suite.addExclude("(&(test=EchoXML)(replyDestType=topic)(endpoint=axis))"); // Example to run a few use cases.. please leave these commented out - asankha + //suite.addExclude("(|(test=AsyncXML)(test=MinConcurrency)(destType=topic)(broker=qpid)(destType=topic)(replyDestType=topic)(client=jms)(endpoint=mock)(cfOnSender=true))"); //suite.addExclude("(|(test=EchoXML)(destType=queue)(broker=qpid)(cfOnSender=true)(singleCF=false)(destType=queue)(client=jms)(endpoint=mock))"); //suite.addExclude("(|(test=EchoXML)(test=AsyncXML)(test=AsyncSwA)(test=AsyncTextPlain)(test=AsyncBinary)(test=AsyncSOAPLarge)(broker=qpid))");