Author: asankha
Date: Sun Dec  7 20:48:21 2008
New Revision: 724253

URL: http://svn.apache.org/viewvc?rev=724253&view=rev
Log:
AXIS2-4164 - Add two new parameters to specify JMS username and password 
(transport.jms.UserName and transport.jms.Password) 
Fix issue with the generation of the WSDL EPR
Correctly call JMSMessageSender.close() after a send operation, and release JMS 
resources as appropriate
Support ActiveMQ dynamicQueues/ and dynamicTopics/ prefixes for convenience, 
since most of the demonstrations will be against AMQ
Test on connection termination/exceptions and re-connection - probably could 
refine this logic a bit more



Modified:
    
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java
    
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java
    
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java
    
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java
    
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java
    
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
    
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java
    
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
    
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html

Modified: 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java?rev=724253&r1=724252&r2=724253&view=diff
==============================================================================
--- 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java
 (original)
+++ 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java
 Sun Dec  7 20:48:21 2008
@@ -254,8 +254,8 @@
         try {
             connection = JMSUtils.createConnection(
                 conFactory,
-                parameters.get(Context.SECURITY_PRINCIPAL),
-                parameters.get(Context.SECURITY_CREDENTIALS),
+                parameters.get(JMSConstants.PARAM_JMS_USERNAME),
+                parameters.get(JMSConstants.PARAM_JMS_PASSWORD),
                 isJmsSpec11(), isQueue());
 
             if (log.isDebugEnabled()) {

Modified: 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java?rev=724253&r1=724252&r2=724253&view=diff
==============================================================================
--- 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java
 (original)
+++ 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java
 Sun Dec  7 20:48:21 2008
@@ -188,6 +188,11 @@
     /** @see PARAM_RECON_INIT_DURATION */
     public static final String PARAM_RECON_MAX_DURATION = 
"transport.jms.MaxReconnectDuration";
 
+    /** The username to use when obtaining a JMS Connection */
+    public static final String PARAM_JMS_USERNAME = "transport.jms.UserName";
+    /** The password to use when obtaining a JMS Connection */
+    public static final String PARAM_JMS_PASSWORD = "transport.jms.Password";
+
     //-------------- message context / transport header properties and client 
options --------------
     /**
      * A MessageContext property or client Option indicating the JMS message 
type

Modified: 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java?rev=724253&r1=724252&r2=724253&view=diff
==============================================================================
--- 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java
 (original)
+++ 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java
 Sun Dec  7 20:48:21 2008
@@ -122,8 +122,7 @@
         JMSEndpoint endpoint = new JMSEndpoint();
         endpoint.setService(service);
         endpoint.setCf(cf);
-        endpoint.computeEPRs(); // compute service EPR and keep for later use
-        
+
         Parameter destParam = 
service.getParameter(JMSConstants.PARAM_DESTINATION);
         if (destParam != null) {
             endpoint.setJndiDestinationName((String)destParam.getValue());
@@ -135,7 +134,7 @@
         Parameter destTypeParam = 
service.getParameter(JMSConstants.PARAM_DEST_TYPE);
         if (destTypeParam != null) {
             String paramValue = (String) destTypeParam.getValue();
-            if(JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) ||
+            if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) ||
                     JMSConstants.DESTINATION_TYPE_TOPIC.equals(paramValue) )  {
                 endpoint.setDestinationType(paramValue);
             } else {
@@ -156,6 +155,8 @@
         } else {
             
endpoint.setContentTypeRuleSet(ContentTypeRuleFactory.parse(contentTypeParam));
         }
+
+        endpoint.computeEPRs(); // compute service EPR and keep for later use  
      
         serviceNameToEndpointMap.put(service.getName(), endpoint);
         
         ServiceTaskManager stm = JMSUtils.createTaskManagerForService(cf, 
service, workerPool);

Modified: 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java?rev=724253&r1=724252&r2=724253&view=diff
==============================================================================
--- 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java
 (original)
+++ 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java
 Sun Dec  7 20:48:21 2008
@@ -230,27 +230,33 @@
      * Close non-shared producer, session and connection if any
      */
     public void close() {
-        if (cacheLevel < JMSConstants.CACHE_PRODUCER) {
+        if (producer != null && cacheLevel < JMSConstants.CACHE_PRODUCER) {
             try {
                 producer.close();
             } catch (JMSException e) {
                 log.error("Error closing JMS MessageProducer after send", e);
+            } finally {
+                producer = null;
             }
         }
 
-        if (cacheLevel < JMSConstants.CACHE_SESSION) {
+        if (session != null && cacheLevel < JMSConstants.CACHE_SESSION) {
             try {
                 session.close();
             } catch (JMSException e) {
                 log.error("Error closing JMS Session after send", e);
+            } finally {
+                session = null;
             }
         }
 
-        if (cacheLevel < JMSConstants.CACHE_CONNECTION) {
+        if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) {
             try {
                 connection.close();
             } catch (JMSException e) {
                 log.error("Error closing JMS Connection after send", e);
+            } finally {
+                connection = null;
             }
         }
     }

Modified: 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java?rev=724253&r1=724252&r2=724253&view=diff
==============================================================================
--- 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java
 (original)
+++ 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java
 Sun Dec  7 20:48:21 2008
@@ -168,8 +168,12 @@
         try {
             return JMSUtils.lookup(context, Destination.class, 
destinationName);
         } catch (NameNotFoundException e) {
-            if (log.isDebugEnabled()) {
-                log.debug("Cannot locate destination : " + destinationName + " 
using " + url);
+            try {
+                return JMSUtils.lookup(context, Destination.class,
+                    
(JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ?
+                        "dynamicTopics/" : "dynamicQueues/") + 
destinationName);
+            } catch (NamingException x) {
+                handleException("Cannot locate destination : " + 
destinationName + " using " + url);
             }
         } catch (NamingException e) {
             handleException("Cannot locate destination : " + destinationName + 
" using " + url, e);

Modified: 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java?rev=724253&r1=724252&r2=724253&view=diff
==============================================================================
--- 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
 (original)
+++ 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
 Sun Dec  7 20:48:21 2008
@@ -72,9 +72,9 @@
      */
     private JMSConnectionFactory getJMSConnectionFactory(JMSOutTransportInfo 
trpInfo) {
         Map<String,String> props = trpInfo.getProperties();
-        if(trpInfo.getProperties() != null) {
+        if (trpInfo.getProperties() != null) {
             String jmsConnectionFactoryName = 
props.get(JMSConstants.PARAM_JMS_CONFAC);
-            if(jmsConnectionFactoryName != null) {
+            if (jmsConnectionFactoryName != null) {
                 return 
connFacManager.getJMSConnectionFactory(jmsConnectionFactoryName);
             } else {
                 return connFacManager.getJMSConnectionFactory(props);
@@ -134,7 +134,11 @@
 
         // need to synchronize as Sessions are not thread safe
         synchronized (messageSender.getSession()) {
-            sendOverJMS(msgCtx, messageSender, contentTypeProperty, 
jmsConnectionFactory, jmsOut);
+            try {
+                sendOverJMS(msgCtx, messageSender, contentTypeProperty, 
jmsConnectionFactory, jmsOut);
+            } finally {
+                messageSender.close();
+            }
         }
     }
 

Modified: 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java?rev=724253&r1=724252&r2=724253&view=diff
==============================================================================
--- 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java
 (original)
+++ 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java
 Sun Dec  7 20:48:21 2008
@@ -80,7 +80,8 @@
      * Get the EPR for the given JMS connection factory and destination
      * the form of the URL is
      * jms:/<destination>?[<key>=<value>&]*
-     * Credentials Context.SECURITY_PRINCIPAL and Context.SECURITY_CREDENTIALS 
are filtered
+     * Credentials Context.SECURITY_PRINCIPAL, Context.SECURITY_CREDENTIALS
+     * JMSConstants.PARAM_JMS_USERNAME and JMSConstants.PARAM_JMS_USERNAME are 
filtered
      *
      * @param cf the Axis2 JMS connection factory
      * @param destinationType the type of destination
@@ -110,7 +111,9 @@
 
         for (Map.Entry<String,String> entry : cf.getParameters().entrySet()) {
             if (!Context.SECURITY_PRINCIPAL.equalsIgnoreCase(entry.getKey()) &&
-                
!Context.SECURITY_CREDENTIALS.equalsIgnoreCase(entry.getKey())) {
+                !Context.SECURITY_CREDENTIALS.equalsIgnoreCase(entry.getKey()) 
&&
+                
!JMSConstants.PARAM_JMS_USERNAME.equalsIgnoreCase(entry.getKey()) &&
+                
!JMSConstants.PARAM_JMS_PASSWORD.equalsIgnoreCase(entry.getKey())) {
                 sb.append("&").append(
                     entry.getKey()).append("=").append(entry.getValue());
             }
@@ -584,12 +587,16 @@
         ServiceTaskManager stm = new ServiceTaskManager();
 
         stm.setServiceName(name);
-        stm.setJndiProperties(jcf.getParameters());
+        stm.addJmsProperties(cf);
+        stm.addJmsProperties(svc);
 
         stm.setConnFactoryJNDIName(
             getRqdStringProperty(JMSConstants.PARAM_CONFAC_JNDI_NAME, svc, 
cf));
-        stm.setDestinationJNDIName(
-            getRqdStringProperty(JMSConstants.PARAM_DESTINATION, svc, cf));
+        String destName = 
getOptionalStringProperty(JMSConstants.PARAM_DESTINATION, svc, cf);
+        if (destName == null) {
+            destName = service.getName();
+        }
+        stm.setDestinationJNDIName(destName);
         stm.setDestinationType(getDestinationType(svc, cf));
 
         stm.setJmsSpec11(
@@ -624,7 +631,7 @@
         if (value != null) {
             stm.setConcurrentConsumers(value);
         }
-        value = 
getOptionalIntProperty(JMSConstants.PARAM_CONCURRENT_CONSUMERS, svc, cf);
+        value = getOptionalIntProperty(JMSConstants.PARAM_MAX_CONSUMERS, svc, 
cf);
         if (value != null) {
             stm.setMaxConcurrentConsumers(value);
         }
@@ -651,6 +658,29 @@
         }
 
         stm.setWorkerPool(workerPool);
+
+        // remove processed properties from property bag
+        stm.removeJmsProperties(JMSConstants.PARAM_CONFAC_JNDI_NAME);
+        stm.removeJmsProperties(JMSConstants.PARAM_DESTINATION);
+        stm.removeJmsProperties(JMSConstants.PARAM_JMS_SPEC_VER);
+        stm.removeJmsProperties(BaseConstants.PARAM_TRANSACTIONALITY);
+        stm.removeJmsProperties(BaseConstants.PARAM_CACHE_USER_TXN);
+        stm.removeJmsProperties(BaseConstants.PARAM_USER_TXN_JNDI_NAME);
+        stm.removeJmsProperties(JMSConstants.PARAM_SESSION_TRANSACTED);
+        stm.removeJmsProperties(JMSConstants.PARAM_MSG_SELECTOR);
+        stm.removeJmsProperties(JMSConstants.PARAM_SUB_DURABLE);
+        stm.removeJmsProperties(JMSConstants.PARAM_DURABLE_SUB_NAME);
+        stm.removeJmsProperties(JMSConstants.PARAM_CACHE_LEVEL);
+        stm.removeJmsProperties(JMSConstants.PARAM_PUBSUB_NO_LOCAL);
+        stm.removeJmsProperties(JMSConstants.PARAM_RCV_TIMEOUT);
+        stm.removeJmsProperties(JMSConstants.PARAM_CONCURRENT_CONSUMERS);
+        stm.removeJmsProperties(JMSConstants.PARAM_MAX_CONSUMERS);
+        stm.removeJmsProperties(JMSConstants.PARAM_IDLE_TASK_LIMIT);
+        stm.removeJmsProperties(JMSConstants.PARAM_MAX_MSGS_PER_TASK);
+        stm.removeJmsProperties(JMSConstants.PARAM_RECON_INIT_DURATION);
+        stm.removeJmsProperties(JMSConstants.PARAM_RECON_MAX_DURATION);
+        stm.removeJmsProperties(JMSConstants.PARAM_RECON_FACTOR);
+
         return stm;
     }
 
@@ -990,9 +1020,9 @@
         jmsOut.loadConnectionFactoryFromProperies();
 
         // create a one time connection and session to be used
-        Hashtable<String,String> jndiProps = jmsOut.getProperties();
-        String user = jndiProps != null ? 
jndiProps.get(Context.SECURITY_PRINCIPAL) : null;
-        String pass = jndiProps != null ? 
jndiProps.get(Context.SECURITY_CREDENTIALS) : null;
+        Hashtable<String,String> jmsProps = jmsOut.getProperties();
+        String user = jmsProps != null ? 
jmsProps.get(JMSConstants.PARAM_JMS_USERNAME) : null;
+        String pass = jmsProps != null ? 
jmsProps.get(JMSConstants.PARAM_JMS_PASSWORD) : null;
 
         QueueConnectionFactory qConFac = null;
         TopicConnectionFactory tConFac = null;
@@ -1041,8 +1071,9 @@
         }
 
         return new JMSMessageSender(connection, session, producer,
-            destination, JMSConstants.CACHE_NONE, false,
-                destType == -1 ? null : destType == JMSConstants.QUEUE ? 
Boolean.TRUE : Boolean.FALSE);
+            destination, (jmsOut.getJmsConnectionFactory() == null ?
+            JMSConstants.CACHE_NONE : 
jmsOut.getJmsConnectionFactory().getCacheLevel()), false,
+            destType == -1 ? null : destType == JMSConstants.QUEUE ? 
Boolean.TRUE : Boolean.FALSE);
     }
 
     /**

Modified: 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java?rev=724253&r1=724252&r2=724253&view=diff
==============================================================================
--- 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
 (original)
+++ 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
 Sun Dec  7 20:48:21 2008
@@ -32,10 +32,7 @@
 import javax.transaction.UserTransaction;
 import javax.transaction.NotSupportedException;
 import javax.transaction.SystemException;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Collections;
-import java.util.ArrayList;
+import java.util.*;
 
 /**
  * Each service will have one ServiceTaskManager instance that will create, 
manage and also destroy
@@ -116,8 +113,8 @@
     /** Upper limit on reconnection attempt duration */
     private long maxReconnectDuration = 1000 * 60 * 60; // 1 hour
 
-    /** The JNDI context properties */
-    private Hashtable<String,String> jndiProperties = null;
+    /** The JNDI context properties and other general properties */
+    private Hashtable<String,String> jmsProperties = new Hashtable<String, 
String>();
     /** The JNDI Context acuired */
     private Context context = null;
     /** The ConnectionFactory to be used */
@@ -303,6 +300,20 @@
     }
 
     /**
+     * Get the number of MessageListenerTasks that are currently connected to 
the JMS provider
+     * @return connected task count
+     */
+    private int getConnectedTaskCount() {
+        int count = 0;
+        for (MessageListenerTask lstTask : pollingTasks) {
+            if (lstTask.isConnected()) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    /**
      * The actual threads/tasks that perform message polling
      */
     private class MessageListenerTask implements Runnable, ExceptionListener {
@@ -319,6 +330,8 @@
         private int idleExecutionCount = 0;
         /** Is this task idle right now? */
         private volatile boolean idle = false;
+        /** Is this task connected to the JMS provider successfully? */
+        private boolean connected = false;
 
         /** As soon as we create a new polling task, add it to the STM for 
control later */
         MessageListenerTask() {
@@ -369,49 +382,58 @@
                 log.debug("New poll task starting : thread id = " + 
Thread.currentThread().getId());
             }
 
-            while (isActive() &&
-                (getMaxMessagesPerTask() < 0 || messageCount < 
getMaxMessagesPerTask()) &&
-                (getConcurrentConsumers() == 1 || idleExecutionCount < 
getIdleTaskExecutionLimit())) {
+            try {
+                while (isActive() &&
+                    (getMaxMessagesPerTask() < 0 || messageCount < 
getMaxMessagesPerTask()) &&
+                    (getConcurrentConsumers() == 1 || idleExecutionCount < 
getIdleTaskExecutionLimit())) {
 
-                UserTransaction ut = null;
-                try {
-                    if (transactionality == BaseConstants.TRANSACTION_JTA) {
-                        ut = getUserTransaction();
-                        ut.begin();
+                    UserTransaction ut = null;
+                    try {
+                        if (transactionality == BaseConstants.TRANSACTION_JTA) 
{
+                            ut = getUserTransaction();
+                            ut.begin();
+                        }
+                    } catch (NotSupportedException e) {
+                        handleException("Listener Task is already associated 
with a transaction", e);
+                    } catch (SystemException e) {
+                        handleException("Error starting a JTA transaction", e);
                     }
-                } catch (NotSupportedException e) {
-                    handleException("Listener Task is already associated with 
a transaction", e);
-                } catch (SystemException e) {
-                    handleException("Error starting a JTA transaction", e);
-                }
 
-                // Get a message by polling, or receive null
-                Message message = receiveMessage();
+                    // Get a message by polling, or receive null
+                    Message message = receiveMessage();
+
+                    if (log.isTraceEnabled()) {
+                        if (message != null) {
+                            try {
+                                log.trace("<<<<<<< READ message with Message 
ID : " +
+                                    message.getJMSMessageID() + " from : " + 
destination +
+                                    " by Thread ID : " + 
Thread.currentThread().getId());
+                            } catch (JMSException ignore) {}
+                        } else {
+                            log.trace("No message received by Thread ID : " +
+                                Thread.currentThread().getId() + " for 
destination : " + destination);
+                        }
+                    }
 
-                if (log.isTraceEnabled()) {
                     if (message != null) {
-                        try {
-                            log.trace("<<<<<<< READ message with Message ID : 
" +
-                                message.getJMSMessageID() + " from : " + 
destination +
-                                " by Thread ID : " + 
Thread.currentThread().getId());
-                        } catch (JMSException ignore) {}
+                        idle = false;
+                        idleExecutionCount = 0;
+                        messageCount++;
+                        // I will be busy now while processing this message, 
so start another if needed
+                        scheduleNewTaskIfAppropriate();
+                        handleMessage(message, ut);
+
                     } else {
-                        log.trace("No message received by Thread ID : " +
-                            Thread.currentThread().getId() + " for destination 
: " + destination);
+                        idle = true;
+                        idleExecutionCount++;
                     }
                 }
 
-                if (message != null) {
-                    idle = false;
-                    idleExecutionCount = 0;
-                    messageCount++;
-                    // I will be busy now while processing this message, so 
start another if needed
-                    scheduleNewTaskIfAppropriate();
-                    handleMessage(message, ut);
-
-                } else {
-                    idle = true;
-                    idleExecutionCount++;
+            } finally {
+                workerState = STATE_STOPPED;
+                activeTaskCount--;
+                synchronized(pollingTasks) {
+                    pollingTasks.remove(this);
                 }
             }
 
@@ -431,10 +453,6 @@
             closeSession(true);
             closeConnection();
 
-            activeTaskCount--;
-            synchronized(pollingTasks) {
-                pollingTasks.remove(this);
-            }
             // My time is up, so if I am going away, create another
             scheduleNewTaskIfAppropriate();
         }
@@ -561,9 +579,13 @@
         public void onException(JMSException j) {
 
             if (!isSTMActive()) {
+                requestShutdown();
                 return;
             }
 
+            log.warn("JMS Connection failure : " + j.getMessage());
+            setConnected(false);
+
             if (cacheLevel < JMSConstants.CACHE_CONNECTION) {
                 // failed Connection was not shared, thus no need to restart 
the whole STM
                 requestShutdown();
@@ -572,7 +594,7 @@
 
             // if we failed while active, update state to show failure
             setServiceTaskManagerState(STATE_FAILURE);
-            log.error("JMS Connection failed : " + j.getMessage() + " - 
shutting down worker tasks", j);
+            log.error("JMS Connection failed : " + j.getMessage() + " - 
shutting down worker tasks");
 
             int r = 1;
             long retryDuration = initialReconnectDuration;
@@ -581,9 +603,22 @@
                 try {
                     log.info("Reconnection attempt : " + r + " for service : " 
+ serviceName);
                     start();
-                } catch (Exception e) {
+                } catch (Exception ignore) {}
+
+                boolean connected = false;
+                for (int i=0; i<5; i++) {
+                    if (getConnectedTaskCount() == concurrentConsumers) {
+                        connected = true;
+                        break;
+                    }
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException ignore) {}
+                }
+
+                if (!connected) {
                     log.error("Reconnection attempt : " + (r++) + " for 
service : " + serviceName +
-                        " failed. Next retry in " + (retryDuration/1000) + 
"seconds", e);
+                        " failed. Next retry in " + (retryDuration/1000) + 
"seconds");
                     retryDuration = (long) (retryDuration * 
reconnectionProgressionFactor);
                     if (retryDuration > maxReconnectDuration) {
                         retryDuration = maxReconnectDuration;
@@ -593,7 +628,8 @@
                         Thread.sleep(retryDuration);
                     } catch (InterruptedException ignore) {}
                 }
-            } while (!isSTMActive());
+
+            } while (!isSTMActive() || getConnectedTaskCount() < 
concurrentConsumers);
         }
 
         protected void requestShutdown() {
@@ -608,6 +644,14 @@
             return idle;
         }
 
+        public boolean isConnected() {
+            return connected;
+        }
+
+        public void setConnected(boolean connected) {
+            this.connected = connected;
+        }
+
         /**
          * Get a Connection that could/should be used by this task - depends 
on the cache level to reuse
          * @return the shared Connection if cache level is higher than 
CACHE_NONE, or a new Connection
@@ -630,6 +674,7 @@
                     }
                 }
             }
+            setConnected(true);
             return connection;
         }
 
@@ -733,15 +778,15 @@
                 log.info("Connected to the JMS connection factory : " + 
getConnFactoryJNDIName());
             } catch (NamingException e) {
                 handleException("Error looking up connection factory : " + 
getConnFactoryJNDIName() +
-                    " using JNDI properties : " + jndiProperties, e);
+                    " using JNDI properties : " + jmsProperties, e);
             }
 
             Connection connection = null;
             try {
                 connection = JMSUtils.createConnection(
                     conFactory,
-                    jndiProperties.get(Context.SECURITY_PRINCIPAL),
-                    jndiProperties.get(Context.SECURITY_CREDENTIALS),
+                    jmsProperties.get(JMSConstants.PARAM_JMS_USERNAME),
+                    jmsProperties.get(JMSConstants.PARAM_JMS_PASSWORD),
                     isJmsSpec11(), isQueue());
 
                 connection.setExceptionListener(this);
@@ -750,7 +795,7 @@
 
             } catch (JMSException e) {
                 handleException("Error acquiring a JMS connection to : " + 
getConnFactoryJNDIName() +
-                    " using JNDI properties : " + jndiProperties, e);
+                    " using JNDI properties : " + jmsProperties, e);
             }
             return connection;
         }
@@ -786,7 +831,7 @@
                 }
 
                 return JMSUtils.createConsumer(
-                    session, getDestination(), isQueue(),
+                    session, getDestination(session), isQueue(),
                     (isSubscriptionDurable() && getDurableSubscriberName() == 
null ?
                         getDurableSubscriberName() : serviceName),
                     getMessageSelector(), isPubSubNoLocal(), 
isSubscriptionDurable(), isJmsSpec11());
@@ -806,7 +851,7 @@
      */
     private Context getInitialContext() throws NamingException {
         if (context == null) {
-            context = new InitialContext(jndiProperties);
+            context = new InitialContext(jmsProperties);
         }
         return context;
     }
@@ -815,7 +860,7 @@
      * Return the JMS Destination for the JNDI name of the Destination from 
the InitialContext
      * @return the JMS Destination to which this STM listens for messages
      */
-    private Destination getDestination() {
+    private Destination getDestination(Session session) {
         if (destination == null) {
             try {
                 context = getInitialContext();
@@ -825,8 +870,26 @@
                         " found for service " + serviceName);
                 }
             } catch (NamingException e) {
-                handleException("Error looking up JMS destination : " + 
getDestinationJNDIName() +
-                    " using JNDI properties : " + jndiProperties, e);
+                try {
+                    switch (destinationType) {
+                        case JMSConstants.QUEUE: {
+                            destination = 
session.createQueue(getDestinationJNDIName());
+                            break;
+                        }
+                        case JMSConstants.TOPIC: {
+                            destination = 
session.createTopic(getDestinationJNDIName());
+                            break;
+                        }
+                        default: {
+                            handleException("Error looking up JMS destination 
: " +
+                                getDestinationJNDIName() + " using JNDI 
properties : " +
+                                jmsProperties, e);
+                        }
+                    }
+                } catch (JMSException j) {
+                    handleException("Error looking up and creating JMS 
destination : " +
+                        getDestinationJNDIName() + " using JNDI properties : " 
+ jmsProperties, e);
+                }
             }
         }
         return destination;
@@ -848,7 +911,7 @@
                     JMSUtils.lookup(context, UserTransaction.class, 
getUserTransactionJNDIName());
             } catch (NamingException e) {
                 handleException("Error looking up UserTransaction : " + 
getDestinationJNDIName() +
-                    " using JNDI properties : " + jndiProperties, e);
+                    " using JNDI properties : " + jmsProperties, e);
             }
         }
         
@@ -862,7 +925,7 @@
                 }
             } catch (NamingException e) {
                 handleException("Error looking up UserTransaction : " + 
getDestinationJNDIName() +
-                    " using JNDI properties : " + jndiProperties, e);
+                    " using JNDI properties : " + jmsProperties, e);
             }
         }
         return sharedUserTransaction;
@@ -1100,12 +1163,16 @@
         this.jmsSpec11 = jmsSpec11;
     }
 
-    public Hashtable<String, String> getJndiProperties() {
-        return jndiProperties;
+    public Hashtable<String, String> getJmsProperties() {
+        return jmsProperties;
+    }
+
+    public void addJmsProperties(Map<String, String> jmsProperties) {
+        this.jmsProperties.putAll(jmsProperties);
     }
 
-    public void setJndiProperties(Hashtable<String, String> jndiProperties) {
-        this.jndiProperties = jndiProperties;
+    public void removeJmsProperties(String key) {
+        this.jmsProperties.remove(key);
     }
 
     public Context getContext() {

Modified: 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html?rev=724253&r1=724252&r2=724253&view=diff
==============================================================================
--- 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html
 (original)
+++ 
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html
 Sun Dec  7 20:48:21 2008
@@ -97,6 +97,9 @@
 transport.UserTxnJNDIName
 transport.CacheUserTxn - true | false
 
+transport.jms.UserName - user name to use when creating a new JMS Connection
+transport.jms.Password - password to use when creating a new JMS Connection
+
 
 
 transport.jms.PublishEPR - one or more EPR's could be specified. If none 
specified, defaults to


Reply via email to