Author: veithen Date: Sat Jul 3 11:16:06 2010 New Revision: 960200 URL: http://svn.apache.org/viewvc?rev=960200&view=rev Log: AXIS2-4759: Applied patch submitted by Grant Patterson. Fixed ServiceTaskManager to properly closes the connection. Also fixed some concurrency issues.
Modified: axis/axis2/java/transports/trunk/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java Modified: axis/axis2/java/transports/trunk/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java?rev=960200&r1=960199&r2=960200&view=diff ============================================================================== --- axis/axis2/java/transports/trunk/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java (original) +++ axis/axis2/java/transports/trunk/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java Sat Jul 3 11:16:06 2010 @@ -229,9 +229,9 @@ public class ServiceTaskManager { if (sharedConnection != null) { try { - sharedConnection.stop(); + sharedConnection.close(); } catch (JMSException e) { - logError("Error stopping shared Connection", e); + logError("Error closing shared Connection", e); } finally { sharedConnection = null; } @@ -337,7 +337,7 @@ public class ServiceTaskManager { /** Is this task idle right now? */ private volatile boolean idle = false; /** Is this task connected to the JMS provider successfully? */ - private boolean connected = false; + private volatile boolean connected = false; /** As soon as we create a new polling task, add it to the STM for control later */ MessageListenerTask() { @@ -439,31 +439,36 @@ public class ServiceTaskManager { } } finally { + + if (log.isTraceEnabled()) { + log.trace("Listener task with Thread ID : " + Thread.currentThread().getId() + + " is stopping after processing : " + messageCount + " messages :: " + + " isActive : " + isActive() + " maxMessagesPerTask : " + + getMaxMessagesPerTask() + " concurrentConsumers : " + getConcurrentConsumers() + + " idleExecutionCount : " + idleExecutionCount + " idleTaskExecutionLimit : " + + getIdleTaskExecutionLimit()); + } else if (log.isDebugEnabled()) { + log.debug("Listener task with Thread ID : " + Thread.currentThread().getId() + + " is stopping after processing : " + messageCount + " messages"); + } + + // Close the consumer and session before decrementing activeTaskCount. + // (If we have a shared connection, Qpid deadlocks if the shared connection + // is closed on another thread while closing the session) + closeConsumer(true); + closeSession(true); + closeConnection(); + workerState = STATE_STOPPED; activeTaskCount--; synchronized(pollingTasks) { pollingTasks.remove(this); } + + // My time is up, so if I am going away, create another + scheduleNewTaskIfAppropriate(); } - if (log.isTraceEnabled()) { - log.trace("Listener task with Thread ID : " + Thread.currentThread().getId() + - " is stopping after processing : " + messageCount + " messages :: " + - " isActive : " + isActive() + " maxMessagesPerTask : " + - getMaxMessagesPerTask() + " concurrentConsumers : " + getConcurrentConsumers() + - " idleExecutionCount : " + idleExecutionCount + " idleTaskExecutionLimit : " + - getIdleTaskExecutionLimit()); - } else if (log.isDebugEnabled()) { - log.debug("Listener task with Thread ID : " + Thread.currentThread().getId() + - " is stopping after processing : " + messageCount + " messages"); - } - - closeConsumer(true); - closeSession(true); - closeConnection(); - - // My time is up, so if I am going away, create another - scheduleNewTaskIfAppropriate(); } /** @@ -670,20 +675,23 @@ public class ServiceTaskManager { // Connection is not shared if (connection == null) { connection = createConnection(); + setConnected(true); } - } else { - if (sharedConnection != null) { - connection = sharedConnection; - } else { - synchronized(this) { - if (sharedConnection == null) { - sharedConnection = createConnection(); - } - connection = sharedConnection; - } - } + + } else if (connection == null) { + // Connection is shared, but may not have been created + + synchronized(ServiceTaskManager.this) { + if (sharedConnection == null) { + sharedConnection = createConnection(); + } + } + connection = sharedConnection; + setConnected(true); + } - setConnected(true); + // else: Connection is shared and is already referenced by this.connection + return connection; }