Author: asankha
Date: Sun Nov 30 09:47:29 2008
New Revision: 721864

URL: http://svn.apache.org/viewvc?rev=721864&view=rev
Log:
Share connection between all tasks of a STM when connection or above is shared 
(http://markmail.org/message/j2f5xdrtfeuoup7f)
Use two variables to store state of STM and its individual tasks

Modified:
    
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java

Modified: 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java?rev=721864&r1=721863&r2=721864&view=diff
==============================================================================
--- 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
 (original)
+++ 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
 Sun Nov 30 09:47:29 2008
@@ -132,12 +132,15 @@
     private JMSMessageReceiver jmsMessageReceiver = null;
 
     /** State of this Task Manager */
-    private volatile int state = STATE_STOPPED;
+    private volatile int serviceTaskManagerState = STATE_STOPPED;
     /** Number of invoker tasks active */
     private volatile int activeTaskCount = 0;
     /** The shared thread pool from the Listener */
     private WorkerPool workerPool = null;
 
+    /** The JMS Connection shared between multiple polling tasks - when 
enabled (reccomended) */
+    private Connection sharedConnection = null;
+
     /**
      * Start or re-start the Task Manager by shutting down any existing worker 
tasks and
      * re-creating them. However, if this is STM is PAUSED, a start request is 
ignored.
@@ -146,13 +149,13 @@
      */
     public synchronized void start() {
 
-        if (state == STATE_PAUSED) {
+        if (serviceTaskManagerState == STATE_PAUSED) {
             log.info("Attempt to re-start paused TaskManager is ignored. 
Please use resume instead");
             return;
         }
 
         // if any tasks are running, stop whats running now
-        if (pollingTasks.isEmpty()) {
+        if (!pollingTasks.isEmpty()) {
             stop();
         }
 
@@ -167,7 +170,7 @@
                     "worker tasks of service : " + serviceName);
                 break;
             case JMSConstants.CACHE_CONNECTION:
-                log.debug("Only the JMS Connection will be cached and shared 
between successive " +
+                log.debug("Only the JMS Connection will be cached and shared 
between *all* " +
                     "poller task invocations");
                 break;
             case JMSConstants.CACHE_SESSION:
@@ -188,7 +191,7 @@
             workerPool.execute(new MessageListenerTask());
         }
 
-        state = STATE_STARTED;
+        serviceTaskManagerState = STATE_STARTED;
         log.info("Task manager for service : " + serviceName + " 
[re-]initialized");
     }
 
@@ -201,8 +204,8 @@
             log.debug("Stopping ServiceTaskManager for service : " + 
serviceName);
         }
 
-        if (state != STATE_FAILURE) {
-            state = STATE_SHUTTING_DOWN;
+        if (serviceTaskManagerState != STATE_FAILURE) {
+            serviceTaskManagerState = STATE_SHUTTING_DOWN;
         }
 
         synchronized(pollingTasks) {
@@ -221,12 +224,22 @@
             } catch (InterruptedException ignore) {}
         }
 
+        if (sharedConnection != null) {
+            try {
+                sharedConnection.stop();
+            } catch (JMSException e) {
+                logError("Error stopping shared Connection", e);
+            } finally {
+                sharedConnection = null;
+            }
+        }
+
         if (activeTaskCount > 0) {
             log.warn("Unable to shutdown all polling tasks of service : " + 
serviceName);
         }
 
-        if (state != STATE_FAILURE) {
-            state = STATE_STOPPED;
+        if (serviceTaskManagerState != STATE_FAILURE) {
+            serviceTaskManagerState = STATE_STOPPED;
         }
         log.info("Task manager for service : " + serviceName + " shutdown");
     }
@@ -235,19 +248,33 @@
      * Temporarily suspend receipt and processing of messages. Accomplished by 
stopping the
      * connection / or connections used by the poller tasks
      */
-    public void pause() {
+    public synchronized void pause() {
         for (MessageListenerTask lstTask : pollingTasks) {
             lstTask.pause();
         }
+        if (sharedConnection != null) {
+            try {
+                sharedConnection.stop();
+            } catch (JMSException e) {
+                logError("Error pausing shared Connection", e);
+            }
+        }
     }
 
     /**
      * Resume receipt and processing of messages of paused tasks
      */
-    public void resume() {
+    public synchronized void resume() {
         for (MessageListenerTask lstTask : pollingTasks) {
             lstTask.resume();
         }
+        if (sharedConnection != null) {
+            try {
+                sharedConnection.start();
+            } catch (JMSException e) {
+                logError("Error resuming shared Connection", e);
+            }
+        }
     }
 
     /**
@@ -255,7 +282,7 @@
      * e do not have any idle tasks - i.e. scale up listening
      */
     private void scheduleNewTaskIfAppropriate() {
-        if (state == STATE_STARTED &&
+        if (serviceTaskManagerState == STATE_STARTED &&
             pollingTasks.size() < getMaxConcurrentConsumers() && 
getIdleTaskCount() == 0) {
             workerPool.execute(new MessageListenerTask());
         }
@@ -287,7 +314,7 @@
         /** The MessageConsumer used by the polling task */
         private MessageConsumer consumer = null;
         /** State of the worker polling task */
-        private volatile int state = STATE_STOPPED;
+        private volatile int workerState = STATE_STOPPED;
         /** The number of idle (i.e. without fetching a message) polls for 
this task */
         private int idleExecutionCount = 0;
         /** Is this task idle right now? */
@@ -305,14 +332,14 @@
          */
         public void pause() {
             if (isActive()) {
-                if (connection != null) {
+                if (connection != null && cacheLevel < 
JMSConstants.CACHE_CONNECTION) {
                     try {
                         connection.stop();
                     } catch (JMSException e) {
                         log.warn("Error pausing Message Listener task for 
service : " + serviceName);
                     }
                 }
-                state = STATE_PAUSED;
+                workerState = STATE_PAUSED;
             }
         }
 
@@ -320,21 +347,21 @@
          * Resume this polling task
          */
         public void resume() {
-            if (connection != null) {
+            if (connection != null && cacheLevel < 
JMSConstants.CACHE_CONNECTION) {
                 try {
                     connection.start();
                 } catch (JMSException e) {
                     log.warn("Error resuming Message Listener task for service 
: " + serviceName);
                 }
             }
-            state = STATE_STARTED;
+            workerState = STATE_STARTED;
         }
 
         /**
          * Execute the polling worker task
          */
         public void run() {
-            state = STATE_STARTED;
+            workerState = STATE_STARTED;
             activeTaskCount++;
             int messageCount = 0;
 
@@ -402,7 +429,7 @@
 
             closeConsumer(true);
             closeSession(true);
-            closeConnection(true);
+            closeConnection();
 
             activeTaskCount--;
             synchronized(pollingTasks) {
@@ -524,7 +551,7 @@
                 }
 
                 closeSession(false);
-                closeConnection(false);
+                closeConnection();
             }
         }
 
@@ -537,8 +564,14 @@
                 return;
             }
 
+            if (cacheLevel < JMSConstants.CACHE_CONNECTION) {
+                // failed Connection was not shared, thus no need to restart 
the whole STM
+                requestShutdown();
+                return;
+            }
+
             // if we failed while active, update state to show failure
-            setState(STATE_FAILURE);
+            setServiceTaskManagerState(STATE_FAILURE);
             log.error("JMS Connection failed : " + j.getMessage() + " - 
shutting down worker tasks", j);
 
             int r = 1;
@@ -564,11 +597,11 @@
         }
 
         protected void requestShutdown() {
-            state = STATE_SHUTTING_DOWN;
+            workerState = STATE_SHUTTING_DOWN;
         }
 
         private boolean isActive() {
-            return state == STATE_STARTED;
+            return workerState == STATE_STARTED;
         }
 
         protected boolean isTaskIdle() {
@@ -580,8 +613,22 @@
          * @return the shared Connection if cache level is higher than 
CACHE_NONE, or a new Connection
          */
         private Connection getConnection() {
-            if (connection == null || cacheLevel < 
JMSConstants.CACHE_CONNECTION) {
-                connection = createConnection();
+            if (cacheLevel < JMSConstants.CACHE_CONNECTION) {
+                // Connection is not shared
+                if (connection == null) {
+                    connection = createConnection();
+                }
+            } else {
+                if (sharedConnection != null) {
+                    connection = sharedConnection;
+                } else {
+                    synchronized(this) {
+                        if (sharedConnection == null) {
+                            sharedConnection = createConnection();
+                        }
+                        connection = sharedConnection;
+                    }
+                }
             }
             return connection;
         }
@@ -618,9 +665,9 @@
          * Close the given Connection, hiding exceptions if any which are 
logged
          * @param connection the Connection to be closed
          */
-        private void closeConnection(boolean forced) {
+        private void closeConnection() {
             if (connection != null &&
-                (cacheLevel < JMSConstants.CACHE_CONNECTION || forced)) {
+                cacheLevel < JMSConstants.CACHE_CONNECTION) {
                 try {
                     if (log.isDebugEnabled()) {
                         log.debug("Closing non-shared JMS connection for 
service : " + serviceName);
@@ -823,7 +870,7 @@
 
     // -------------------- trivial methods ---------------------
     private boolean isSTMActive() {
-        return state == STATE_STARTED;
+        return serviceTaskManagerState == STATE_STARTED;
     }
 
     /**
@@ -1085,41 +1132,7 @@
         return activeTaskCount;
     }
 
-    public void setState(int state) {
-        this.state = state;
+    public void setServiceTaskManagerState(int serviceTaskManagerState) {
+        this.serviceTaskManagerState = serviceTaskManagerState;
     }
-
-    //--------------------- used for development 
testing---------------------------
-    /*public static void main(String[] args) throws Exception {
-        //org.apache.log4j.BasicConfigurator.configure();
-        new ServiceTaskManager().testSTM();
-    }
-
-    private void testSTM() throws Exception {
-        ServiceTaskManager stm = new ServiceTaskManager();
-        Hashtable<String, String> props = new Hashtable<String, String>();
-        props.put("java.naming.factory.initial", 
"weblogic.jndi.WLInitialContextFactory");
-        props.put("java.naming.provider.url", "t3://localhost:7001");
-        stm.setJndiProperties(props);
-        stm.setConnFactoryJNDIName("weblogic.jms.ConnectionFactory");
-        stm.setDestinationJNDIName("weblogic.examples.jms.MyQueue");
-        stm.setServiceName("test");
-        stm.setCacheLevel(JMSConstants.CACHE_CONNECTION);
-        stm.setMaxConcurrentConsumers(40);
-
-        stm.workerPool = new NativeWorkerPool(20, 40, 5, 100, "JMS-Worker", 
"jms");
-        stm.start();
-    }
-
-    public boolean processMessage(Message msg, UserTransaction ut) {
-        try {
-            if (msg instanceof TextMessage) {
-                System.out.println("Received : " + ((TextMessage) 
msg).getText());
-            }
-            return true;
-        } catch (JMSException e) {
-            e.printStackTrace();
-            return false;
-        }
-    }*/
 }


Reply via email to