Author: asankha Date: Sun Nov 30 09:47:29 2008 New Revision: 721864 URL: http://svn.apache.org/viewvc?rev=721864&view=rev Log: Share connection between all tasks of a STM when connection or above is shared (http://markmail.org/message/j2f5xdrtfeuoup7f) Use two variables to store state of STM and its individual tasks
Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java?rev=721864&r1=721863&r2=721864&view=diff ============================================================================== --- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java (original) +++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java Sun Nov 30 09:47:29 2008 @@ -132,12 +132,15 @@ private JMSMessageReceiver jmsMessageReceiver = null; /** State of this Task Manager */ - private volatile int state = STATE_STOPPED; + private volatile int serviceTaskManagerState = STATE_STOPPED; /** Number of invoker tasks active */ private volatile int activeTaskCount = 0; /** The shared thread pool from the Listener */ private WorkerPool workerPool = null; + /** The JMS Connection shared between multiple polling tasks - when enabled (reccomended) */ + private Connection sharedConnection = null; + /** * Start or re-start the Task Manager by shutting down any existing worker tasks and * re-creating them. However, if this is STM is PAUSED, a start request is ignored. @@ -146,13 +149,13 @@ */ public synchronized void start() { - if (state == STATE_PAUSED) { + if (serviceTaskManagerState == STATE_PAUSED) { log.info("Attempt to re-start paused TaskManager is ignored. Please use resume instead"); return; } // if any tasks are running, stop whats running now - if (pollingTasks.isEmpty()) { + if (!pollingTasks.isEmpty()) { stop(); } @@ -167,7 +170,7 @@ "worker tasks of service : " + serviceName); break; case JMSConstants.CACHE_CONNECTION: - log.debug("Only the JMS Connection will be cached and shared between successive " + + log.debug("Only the JMS Connection will be cached and shared between *all* " + "poller task invocations"); break; case JMSConstants.CACHE_SESSION: @@ -188,7 +191,7 @@ workerPool.execute(new MessageListenerTask()); } - state = STATE_STARTED; + serviceTaskManagerState = STATE_STARTED; log.info("Task manager for service : " + serviceName + " [re-]initialized"); } @@ -201,8 +204,8 @@ log.debug("Stopping ServiceTaskManager for service : " + serviceName); } - if (state != STATE_FAILURE) { - state = STATE_SHUTTING_DOWN; + if (serviceTaskManagerState != STATE_FAILURE) { + serviceTaskManagerState = STATE_SHUTTING_DOWN; } synchronized(pollingTasks) { @@ -221,12 +224,22 @@ } catch (InterruptedException ignore) {} } + if (sharedConnection != null) { + try { + sharedConnection.stop(); + } catch (JMSException e) { + logError("Error stopping shared Connection", e); + } finally { + sharedConnection = null; + } + } + if (activeTaskCount > 0) { log.warn("Unable to shutdown all polling tasks of service : " + serviceName); } - if (state != STATE_FAILURE) { - state = STATE_STOPPED; + if (serviceTaskManagerState != STATE_FAILURE) { + serviceTaskManagerState = STATE_STOPPED; } log.info("Task manager for service : " + serviceName + " shutdown"); } @@ -235,19 +248,33 @@ * Temporarily suspend receipt and processing of messages. Accomplished by stopping the * connection / or connections used by the poller tasks */ - public void pause() { + public synchronized void pause() { for (MessageListenerTask lstTask : pollingTasks) { lstTask.pause(); } + if (sharedConnection != null) { + try { + sharedConnection.stop(); + } catch (JMSException e) { + logError("Error pausing shared Connection", e); + } + } } /** * Resume receipt and processing of messages of paused tasks */ - public void resume() { + public synchronized void resume() { for (MessageListenerTask lstTask : pollingTasks) { lstTask.resume(); } + if (sharedConnection != null) { + try { + sharedConnection.start(); + } catch (JMSException e) { + logError("Error resuming shared Connection", e); + } + } } /** @@ -255,7 +282,7 @@ * e do not have any idle tasks - i.e. scale up listening */ private void scheduleNewTaskIfAppropriate() { - if (state == STATE_STARTED && + if (serviceTaskManagerState == STATE_STARTED && pollingTasks.size() < getMaxConcurrentConsumers() && getIdleTaskCount() == 0) { workerPool.execute(new MessageListenerTask()); } @@ -287,7 +314,7 @@ /** The MessageConsumer used by the polling task */ private MessageConsumer consumer = null; /** State of the worker polling task */ - private volatile int state = STATE_STOPPED; + private volatile int workerState = STATE_STOPPED; /** The number of idle (i.e. without fetching a message) polls for this task */ private int idleExecutionCount = 0; /** Is this task idle right now? */ @@ -305,14 +332,14 @@ */ public void pause() { if (isActive()) { - if (connection != null) { + if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) { try { connection.stop(); } catch (JMSException e) { log.warn("Error pausing Message Listener task for service : " + serviceName); } } - state = STATE_PAUSED; + workerState = STATE_PAUSED; } } @@ -320,21 +347,21 @@ * Resume this polling task */ public void resume() { - if (connection != null) { + if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) { try { connection.start(); } catch (JMSException e) { log.warn("Error resuming Message Listener task for service : " + serviceName); } } - state = STATE_STARTED; + workerState = STATE_STARTED; } /** * Execute the polling worker task */ public void run() { - state = STATE_STARTED; + workerState = STATE_STARTED; activeTaskCount++; int messageCount = 0; @@ -402,7 +429,7 @@ closeConsumer(true); closeSession(true); - closeConnection(true); + closeConnection(); activeTaskCount--; synchronized(pollingTasks) { @@ -524,7 +551,7 @@ } closeSession(false); - closeConnection(false); + closeConnection(); } } @@ -537,8 +564,14 @@ return; } + if (cacheLevel < JMSConstants.CACHE_CONNECTION) { + // failed Connection was not shared, thus no need to restart the whole STM + requestShutdown(); + return; + } + // if we failed while active, update state to show failure - setState(STATE_FAILURE); + setServiceTaskManagerState(STATE_FAILURE); log.error("JMS Connection failed : " + j.getMessage() + " - shutting down worker tasks", j); int r = 1; @@ -564,11 +597,11 @@ } protected void requestShutdown() { - state = STATE_SHUTTING_DOWN; + workerState = STATE_SHUTTING_DOWN; } private boolean isActive() { - return state == STATE_STARTED; + return workerState == STATE_STARTED; } protected boolean isTaskIdle() { @@ -580,8 +613,22 @@ * @return the shared Connection if cache level is higher than CACHE_NONE, or a new Connection */ private Connection getConnection() { - if (connection == null || cacheLevel < JMSConstants.CACHE_CONNECTION) { - connection = createConnection(); + if (cacheLevel < JMSConstants.CACHE_CONNECTION) { + // Connection is not shared + if (connection == null) { + connection = createConnection(); + } + } else { + if (sharedConnection != null) { + connection = sharedConnection; + } else { + synchronized(this) { + if (sharedConnection == null) { + sharedConnection = createConnection(); + } + connection = sharedConnection; + } + } } return connection; } @@ -618,9 +665,9 @@ * Close the given Connection, hiding exceptions if any which are logged * @param connection the Connection to be closed */ - private void closeConnection(boolean forced) { + private void closeConnection() { if (connection != null && - (cacheLevel < JMSConstants.CACHE_CONNECTION || forced)) { + cacheLevel < JMSConstants.CACHE_CONNECTION) { try { if (log.isDebugEnabled()) { log.debug("Closing non-shared JMS connection for service : " + serviceName); @@ -823,7 +870,7 @@ // -------------------- trivial methods --------------------- private boolean isSTMActive() { - return state == STATE_STARTED; + return serviceTaskManagerState == STATE_STARTED; } /** @@ -1085,41 +1132,7 @@ return activeTaskCount; } - public void setState(int state) { - this.state = state; + public void setServiceTaskManagerState(int serviceTaskManagerState) { + this.serviceTaskManagerState = serviceTaskManagerState; } - - //--------------------- used for development testing--------------------------- - /*public static void main(String[] args) throws Exception { - //org.apache.log4j.BasicConfigurator.configure(); - new ServiceTaskManager().testSTM(); - } - - private void testSTM() throws Exception { - ServiceTaskManager stm = new ServiceTaskManager(); - Hashtable<String, String> props = new Hashtable<String, String>(); - props.put("java.naming.factory.initial", "weblogic.jndi.WLInitialContextFactory"); - props.put("java.naming.provider.url", "t3://localhost:7001"); - stm.setJndiProperties(props); - stm.setConnFactoryJNDIName("weblogic.jms.ConnectionFactory"); - stm.setDestinationJNDIName("weblogic.examples.jms.MyQueue"); - stm.setServiceName("test"); - stm.setCacheLevel(JMSConstants.CACHE_CONNECTION); - stm.setMaxConcurrentConsumers(40); - - stm.workerPool = new NativeWorkerPool(20, 40, 5, 100, "JMS-Worker", "jms"); - stm.start(); - } - - public boolean processMessage(Message msg, UserTransaction ut) { - try { - if (msg instanceof TextMessage) { - System.out.println("Received : " + ((TextMessage) msg).getText()); - } - return true; - } catch (JMSException e) { - e.printStackTrace(); - return false; - } - }*/ }