Author: asankha Date: Sun Dec 7 20:48:21 2008 New Revision: 724253 URL: http://svn.apache.org/viewvc?rev=724253&view=rev Log: AXIS2-4164 - Add two new parameters to specify JMS username and password (transport.jms.UserName and transport.jms.Password) Fix issue with the generation of the WSDL EPR Correctly call JMSMessageSender.close() after a send operation, and release JMS resources as appropriate Support ActiveMQ dynamicQueues/ and dynamicTopics/ prefixes for convenience, since most of the demonstrations will be against AMQ Test on connection termination/exceptions and re-connection - probably could refine this logic a bit more
Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java?rev=724253&r1=724252&r2=724253&view=diff ============================================================================== --- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java (original) +++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java Sun Dec 7 20:48:21 2008 @@ -254,8 +254,8 @@ try { connection = JMSUtils.createConnection( conFactory, - parameters.get(Context.SECURITY_PRINCIPAL), - parameters.get(Context.SECURITY_CREDENTIALS), + parameters.get(JMSConstants.PARAM_JMS_USERNAME), + parameters.get(JMSConstants.PARAM_JMS_PASSWORD), isJmsSpec11(), isQueue()); if (log.isDebugEnabled()) { Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java?rev=724253&r1=724252&r2=724253&view=diff ============================================================================== --- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java (original) +++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java Sun Dec 7 20:48:21 2008 @@ -188,6 +188,11 @@ /** @see PARAM_RECON_INIT_DURATION */ public static final String PARAM_RECON_MAX_DURATION = "transport.jms.MaxReconnectDuration"; + /** The username to use when obtaining a JMS Connection */ + public static final String PARAM_JMS_USERNAME = "transport.jms.UserName"; + /** The password to use when obtaining a JMS Connection */ + public static final String PARAM_JMS_PASSWORD = "transport.jms.Password"; + //-------------- message context / transport header properties and client options -------------- /** * A MessageContext property or client Option indicating the JMS message type Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java?rev=724253&r1=724252&r2=724253&view=diff ============================================================================== --- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java (original) +++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java Sun Dec 7 20:48:21 2008 @@ -122,8 +122,7 @@ JMSEndpoint endpoint = new JMSEndpoint(); endpoint.setService(service); endpoint.setCf(cf); - endpoint.computeEPRs(); // compute service EPR and keep for later use - + Parameter destParam = service.getParameter(JMSConstants.PARAM_DESTINATION); if (destParam != null) { endpoint.setJndiDestinationName((String)destParam.getValue()); @@ -135,7 +134,7 @@ Parameter destTypeParam = service.getParameter(JMSConstants.PARAM_DEST_TYPE); if (destTypeParam != null) { String paramValue = (String) destTypeParam.getValue(); - if(JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) || + if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) || JMSConstants.DESTINATION_TYPE_TOPIC.equals(paramValue) ) { endpoint.setDestinationType(paramValue); } else { @@ -156,6 +155,8 @@ } else { endpoint.setContentTypeRuleSet(ContentTypeRuleFactory.parse(contentTypeParam)); } + + endpoint.computeEPRs(); // compute service EPR and keep for later use serviceNameToEndpointMap.put(service.getName(), endpoint); ServiceTaskManager stm = JMSUtils.createTaskManagerForService(cf, service, workerPool); Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java?rev=724253&r1=724252&r2=724253&view=diff ============================================================================== --- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java (original) +++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java Sun Dec 7 20:48:21 2008 @@ -230,27 +230,33 @@ * Close non-shared producer, session and connection if any */ public void close() { - if (cacheLevel < JMSConstants.CACHE_PRODUCER) { + if (producer != null && cacheLevel < JMSConstants.CACHE_PRODUCER) { try { producer.close(); } catch (JMSException e) { log.error("Error closing JMS MessageProducer after send", e); + } finally { + producer = null; } } - if (cacheLevel < JMSConstants.CACHE_SESSION) { + if (session != null && cacheLevel < JMSConstants.CACHE_SESSION) { try { session.close(); } catch (JMSException e) { log.error("Error closing JMS Session after send", e); + } finally { + session = null; } } - if (cacheLevel < JMSConstants.CACHE_CONNECTION) { + if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) { try { connection.close(); } catch (JMSException e) { log.error("Error closing JMS Connection after send", e); + } finally { + connection = null; } } } Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java?rev=724253&r1=724252&r2=724253&view=diff ============================================================================== --- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java (original) +++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java Sun Dec 7 20:48:21 2008 @@ -168,8 +168,12 @@ try { return JMSUtils.lookup(context, Destination.class, destinationName); } catch (NameNotFoundException e) { - if (log.isDebugEnabled()) { - log.debug("Cannot locate destination : " + destinationName + " using " + url); + try { + return JMSUtils.lookup(context, Destination.class, + (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ? + "dynamicTopics/" : "dynamicQueues/") + destinationName); + } catch (NamingException x) { + handleException("Cannot locate destination : " + destinationName + " using " + url); } } catch (NamingException e) { handleException("Cannot locate destination : " + destinationName + " using " + url, e); Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java?rev=724253&r1=724252&r2=724253&view=diff ============================================================================== --- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java (original) +++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java Sun Dec 7 20:48:21 2008 @@ -72,9 +72,9 @@ */ private JMSConnectionFactory getJMSConnectionFactory(JMSOutTransportInfo trpInfo) { Map<String,String> props = trpInfo.getProperties(); - if(trpInfo.getProperties() != null) { + if (trpInfo.getProperties() != null) { String jmsConnectionFactoryName = props.get(JMSConstants.PARAM_JMS_CONFAC); - if(jmsConnectionFactoryName != null) { + if (jmsConnectionFactoryName != null) { return connFacManager.getJMSConnectionFactory(jmsConnectionFactoryName); } else { return connFacManager.getJMSConnectionFactory(props); @@ -134,7 +134,11 @@ // need to synchronize as Sessions are not thread safe synchronized (messageSender.getSession()) { - sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut); + try { + sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut); + } finally { + messageSender.close(); + } } } Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java?rev=724253&r1=724252&r2=724253&view=diff ============================================================================== --- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java (original) +++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java Sun Dec 7 20:48:21 2008 @@ -80,7 +80,8 @@ * Get the EPR for the given JMS connection factory and destination * the form of the URL is * jms:/<destination>?[<key>=<value>&]* - * Credentials Context.SECURITY_PRINCIPAL and Context.SECURITY_CREDENTIALS are filtered + * Credentials Context.SECURITY_PRINCIPAL, Context.SECURITY_CREDENTIALS + * JMSConstants.PARAM_JMS_USERNAME and JMSConstants.PARAM_JMS_USERNAME are filtered * * @param cf the Axis2 JMS connection factory * @param destinationType the type of destination @@ -110,7 +111,9 @@ for (Map.Entry<String,String> entry : cf.getParameters().entrySet()) { if (!Context.SECURITY_PRINCIPAL.equalsIgnoreCase(entry.getKey()) && - !Context.SECURITY_CREDENTIALS.equalsIgnoreCase(entry.getKey())) { + !Context.SECURITY_CREDENTIALS.equalsIgnoreCase(entry.getKey()) && + !JMSConstants.PARAM_JMS_USERNAME.equalsIgnoreCase(entry.getKey()) && + !JMSConstants.PARAM_JMS_PASSWORD.equalsIgnoreCase(entry.getKey())) { sb.append("&").append( entry.getKey()).append("=").append(entry.getValue()); } @@ -584,12 +587,16 @@ ServiceTaskManager stm = new ServiceTaskManager(); stm.setServiceName(name); - stm.setJndiProperties(jcf.getParameters()); + stm.addJmsProperties(cf); + stm.addJmsProperties(svc); stm.setConnFactoryJNDIName( getRqdStringProperty(JMSConstants.PARAM_CONFAC_JNDI_NAME, svc, cf)); - stm.setDestinationJNDIName( - getRqdStringProperty(JMSConstants.PARAM_DESTINATION, svc, cf)); + String destName = getOptionalStringProperty(JMSConstants.PARAM_DESTINATION, svc, cf); + if (destName == null) { + destName = service.getName(); + } + stm.setDestinationJNDIName(destName); stm.setDestinationType(getDestinationType(svc, cf)); stm.setJmsSpec11( @@ -624,7 +631,7 @@ if (value != null) { stm.setConcurrentConsumers(value); } - value = getOptionalIntProperty(JMSConstants.PARAM_CONCURRENT_CONSUMERS, svc, cf); + value = getOptionalIntProperty(JMSConstants.PARAM_MAX_CONSUMERS, svc, cf); if (value != null) { stm.setMaxConcurrentConsumers(value); } @@ -651,6 +658,29 @@ } stm.setWorkerPool(workerPool); + + // remove processed properties from property bag + stm.removeJmsProperties(JMSConstants.PARAM_CONFAC_JNDI_NAME); + stm.removeJmsProperties(JMSConstants.PARAM_DESTINATION); + stm.removeJmsProperties(JMSConstants.PARAM_JMS_SPEC_VER); + stm.removeJmsProperties(BaseConstants.PARAM_TRANSACTIONALITY); + stm.removeJmsProperties(BaseConstants.PARAM_CACHE_USER_TXN); + stm.removeJmsProperties(BaseConstants.PARAM_USER_TXN_JNDI_NAME); + stm.removeJmsProperties(JMSConstants.PARAM_SESSION_TRANSACTED); + stm.removeJmsProperties(JMSConstants.PARAM_MSG_SELECTOR); + stm.removeJmsProperties(JMSConstants.PARAM_SUB_DURABLE); + stm.removeJmsProperties(JMSConstants.PARAM_DURABLE_SUB_NAME); + stm.removeJmsProperties(JMSConstants.PARAM_CACHE_LEVEL); + stm.removeJmsProperties(JMSConstants.PARAM_PUBSUB_NO_LOCAL); + stm.removeJmsProperties(JMSConstants.PARAM_RCV_TIMEOUT); + stm.removeJmsProperties(JMSConstants.PARAM_CONCURRENT_CONSUMERS); + stm.removeJmsProperties(JMSConstants.PARAM_MAX_CONSUMERS); + stm.removeJmsProperties(JMSConstants.PARAM_IDLE_TASK_LIMIT); + stm.removeJmsProperties(JMSConstants.PARAM_MAX_MSGS_PER_TASK); + stm.removeJmsProperties(JMSConstants.PARAM_RECON_INIT_DURATION); + stm.removeJmsProperties(JMSConstants.PARAM_RECON_MAX_DURATION); + stm.removeJmsProperties(JMSConstants.PARAM_RECON_FACTOR); + return stm; } @@ -990,9 +1020,9 @@ jmsOut.loadConnectionFactoryFromProperies(); // create a one time connection and session to be used - Hashtable<String,String> jndiProps = jmsOut.getProperties(); - String user = jndiProps != null ? jndiProps.get(Context.SECURITY_PRINCIPAL) : null; - String pass = jndiProps != null ? jndiProps.get(Context.SECURITY_CREDENTIALS) : null; + Hashtable<String,String> jmsProps = jmsOut.getProperties(); + String user = jmsProps != null ? jmsProps.get(JMSConstants.PARAM_JMS_USERNAME) : null; + String pass = jmsProps != null ? jmsProps.get(JMSConstants.PARAM_JMS_PASSWORD) : null; QueueConnectionFactory qConFac = null; TopicConnectionFactory tConFac = null; @@ -1041,8 +1071,9 @@ } return new JMSMessageSender(connection, session, producer, - destination, JMSConstants.CACHE_NONE, false, - destType == -1 ? null : destType == JMSConstants.QUEUE ? Boolean.TRUE : Boolean.FALSE); + destination, (jmsOut.getJmsConnectionFactory() == null ? + JMSConstants.CACHE_NONE : jmsOut.getJmsConnectionFactory().getCacheLevel()), false, + destType == -1 ? null : destType == JMSConstants.QUEUE ? Boolean.TRUE : Boolean.FALSE); } /** Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java?rev=724253&r1=724252&r2=724253&view=diff ============================================================================== --- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java (original) +++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java Sun Dec 7 20:48:21 2008 @@ -32,10 +32,7 @@ import javax.transaction.UserTransaction; import javax.transaction.NotSupportedException; import javax.transaction.SystemException; -import java.util.Hashtable; -import java.util.List; -import java.util.Collections; -import java.util.ArrayList; +import java.util.*; /** * Each service will have one ServiceTaskManager instance that will create, manage and also destroy @@ -116,8 +113,8 @@ /** Upper limit on reconnection attempt duration */ private long maxReconnectDuration = 1000 * 60 * 60; // 1 hour - /** The JNDI context properties */ - private Hashtable<String,String> jndiProperties = null; + /** The JNDI context properties and other general properties */ + private Hashtable<String,String> jmsProperties = new Hashtable<String, String>(); /** The JNDI Context acuired */ private Context context = null; /** The ConnectionFactory to be used */ @@ -303,6 +300,20 @@ } /** + * Get the number of MessageListenerTasks that are currently connected to the JMS provider + * @return connected task count + */ + private int getConnectedTaskCount() { + int count = 0; + for (MessageListenerTask lstTask : pollingTasks) { + if (lstTask.isConnected()) { + count++; + } + } + return count; + } + + /** * The actual threads/tasks that perform message polling */ private class MessageListenerTask implements Runnable, ExceptionListener { @@ -319,6 +330,8 @@ private int idleExecutionCount = 0; /** Is this task idle right now? */ private volatile boolean idle = false; + /** Is this task connected to the JMS provider successfully? */ + private boolean connected = false; /** As soon as we create a new polling task, add it to the STM for control later */ MessageListenerTask() { @@ -369,49 +382,58 @@ log.debug("New poll task starting : thread id = " + Thread.currentThread().getId()); } - while (isActive() && - (getMaxMessagesPerTask() < 0 || messageCount < getMaxMessagesPerTask()) && - (getConcurrentConsumers() == 1 || idleExecutionCount < getIdleTaskExecutionLimit())) { + try { + while (isActive() && + (getMaxMessagesPerTask() < 0 || messageCount < getMaxMessagesPerTask()) && + (getConcurrentConsumers() == 1 || idleExecutionCount < getIdleTaskExecutionLimit())) { - UserTransaction ut = null; - try { - if (transactionality == BaseConstants.TRANSACTION_JTA) { - ut = getUserTransaction(); - ut.begin(); + UserTransaction ut = null; + try { + if (transactionality == BaseConstants.TRANSACTION_JTA) { + ut = getUserTransaction(); + ut.begin(); + } + } catch (NotSupportedException e) { + handleException("Listener Task is already associated with a transaction", e); + } catch (SystemException e) { + handleException("Error starting a JTA transaction", e); } - } catch (NotSupportedException e) { - handleException("Listener Task is already associated with a transaction", e); - } catch (SystemException e) { - handleException("Error starting a JTA transaction", e); - } - // Get a message by polling, or receive null - Message message = receiveMessage(); + // Get a message by polling, or receive null + Message message = receiveMessage(); + + if (log.isTraceEnabled()) { + if (message != null) { + try { + log.trace("<<<<<<< READ message with Message ID : " + + message.getJMSMessageID() + " from : " + destination + + " by Thread ID : " + Thread.currentThread().getId()); + } catch (JMSException ignore) {} + } else { + log.trace("No message received by Thread ID : " + + Thread.currentThread().getId() + " for destination : " + destination); + } + } - if (log.isTraceEnabled()) { if (message != null) { - try { - log.trace("<<<<<<< READ message with Message ID : " + - message.getJMSMessageID() + " from : " + destination + - " by Thread ID : " + Thread.currentThread().getId()); - } catch (JMSException ignore) {} + idle = false; + idleExecutionCount = 0; + messageCount++; + // I will be busy now while processing this message, so start another if needed + scheduleNewTaskIfAppropriate(); + handleMessage(message, ut); + } else { - log.trace("No message received by Thread ID : " + - Thread.currentThread().getId() + " for destination : " + destination); + idle = true; + idleExecutionCount++; } } - if (message != null) { - idle = false; - idleExecutionCount = 0; - messageCount++; - // I will be busy now while processing this message, so start another if needed - scheduleNewTaskIfAppropriate(); - handleMessage(message, ut); - - } else { - idle = true; - idleExecutionCount++; + } finally { + workerState = STATE_STOPPED; + activeTaskCount--; + synchronized(pollingTasks) { + pollingTasks.remove(this); } } @@ -431,10 +453,6 @@ closeSession(true); closeConnection(); - activeTaskCount--; - synchronized(pollingTasks) { - pollingTasks.remove(this); - } // My time is up, so if I am going away, create another scheduleNewTaskIfAppropriate(); } @@ -561,9 +579,13 @@ public void onException(JMSException j) { if (!isSTMActive()) { + requestShutdown(); return; } + log.warn("JMS Connection failure : " + j.getMessage()); + setConnected(false); + if (cacheLevel < JMSConstants.CACHE_CONNECTION) { // failed Connection was not shared, thus no need to restart the whole STM requestShutdown(); @@ -572,7 +594,7 @@ // if we failed while active, update state to show failure setServiceTaskManagerState(STATE_FAILURE); - log.error("JMS Connection failed : " + j.getMessage() + " - shutting down worker tasks", j); + log.error("JMS Connection failed : " + j.getMessage() + " - shutting down worker tasks"); int r = 1; long retryDuration = initialReconnectDuration; @@ -581,9 +603,22 @@ try { log.info("Reconnection attempt : " + r + " for service : " + serviceName); start(); - } catch (Exception e) { + } catch (Exception ignore) {} + + boolean connected = false; + for (int i=0; i<5; i++) { + if (getConnectedTaskCount() == concurrentConsumers) { + connected = true; + break; + } + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) {} + } + + if (!connected) { log.error("Reconnection attempt : " + (r++) + " for service : " + serviceName + - " failed. Next retry in " + (retryDuration/1000) + "seconds", e); + " failed. Next retry in " + (retryDuration/1000) + "seconds"); retryDuration = (long) (retryDuration * reconnectionProgressionFactor); if (retryDuration > maxReconnectDuration) { retryDuration = maxReconnectDuration; @@ -593,7 +628,8 @@ Thread.sleep(retryDuration); } catch (InterruptedException ignore) {} } - } while (!isSTMActive()); + + } while (!isSTMActive() || getConnectedTaskCount() < concurrentConsumers); } protected void requestShutdown() { @@ -608,6 +644,14 @@ return idle; } + public boolean isConnected() { + return connected; + } + + public void setConnected(boolean connected) { + this.connected = connected; + } + /** * Get a Connection that could/should be used by this task - depends on the cache level to reuse * @return the shared Connection if cache level is higher than CACHE_NONE, or a new Connection @@ -630,6 +674,7 @@ } } } + setConnected(true); return connection; } @@ -733,15 +778,15 @@ log.info("Connected to the JMS connection factory : " + getConnFactoryJNDIName()); } catch (NamingException e) { handleException("Error looking up connection factory : " + getConnFactoryJNDIName() + - " using JNDI properties : " + jndiProperties, e); + " using JNDI properties : " + jmsProperties, e); } Connection connection = null; try { connection = JMSUtils.createConnection( conFactory, - jndiProperties.get(Context.SECURITY_PRINCIPAL), - jndiProperties.get(Context.SECURITY_CREDENTIALS), + jmsProperties.get(JMSConstants.PARAM_JMS_USERNAME), + jmsProperties.get(JMSConstants.PARAM_JMS_PASSWORD), isJmsSpec11(), isQueue()); connection.setExceptionListener(this); @@ -750,7 +795,7 @@ } catch (JMSException e) { handleException("Error acquiring a JMS connection to : " + getConnFactoryJNDIName() + - " using JNDI properties : " + jndiProperties, e); + " using JNDI properties : " + jmsProperties, e); } return connection; } @@ -786,7 +831,7 @@ } return JMSUtils.createConsumer( - session, getDestination(), isQueue(), + session, getDestination(session), isQueue(), (isSubscriptionDurable() && getDurableSubscriberName() == null ? getDurableSubscriberName() : serviceName), getMessageSelector(), isPubSubNoLocal(), isSubscriptionDurable(), isJmsSpec11()); @@ -806,7 +851,7 @@ */ private Context getInitialContext() throws NamingException { if (context == null) { - context = new InitialContext(jndiProperties); + context = new InitialContext(jmsProperties); } return context; } @@ -815,7 +860,7 @@ * Return the JMS Destination for the JNDI name of the Destination from the InitialContext * @return the JMS Destination to which this STM listens for messages */ - private Destination getDestination() { + private Destination getDestination(Session session) { if (destination == null) { try { context = getInitialContext(); @@ -825,8 +870,26 @@ " found for service " + serviceName); } } catch (NamingException e) { - handleException("Error looking up JMS destination : " + getDestinationJNDIName() + - " using JNDI properties : " + jndiProperties, e); + try { + switch (destinationType) { + case JMSConstants.QUEUE: { + destination = session.createQueue(getDestinationJNDIName()); + break; + } + case JMSConstants.TOPIC: { + destination = session.createTopic(getDestinationJNDIName()); + break; + } + default: { + handleException("Error looking up JMS destination : " + + getDestinationJNDIName() + " using JNDI properties : " + + jmsProperties, e); + } + } + } catch (JMSException j) { + handleException("Error looking up and creating JMS destination : " + + getDestinationJNDIName() + " using JNDI properties : " + jmsProperties, e); + } } } return destination; @@ -848,7 +911,7 @@ JMSUtils.lookup(context, UserTransaction.class, getUserTransactionJNDIName()); } catch (NamingException e) { handleException("Error looking up UserTransaction : " + getDestinationJNDIName() + - " using JNDI properties : " + jndiProperties, e); + " using JNDI properties : " + jmsProperties, e); } } @@ -862,7 +925,7 @@ } } catch (NamingException e) { handleException("Error looking up UserTransaction : " + getDestinationJNDIName() + - " using JNDI properties : " + jndiProperties, e); + " using JNDI properties : " + jmsProperties, e); } } return sharedUserTransaction; @@ -1100,12 +1163,16 @@ this.jmsSpec11 = jmsSpec11; } - public Hashtable<String, String> getJndiProperties() { - return jndiProperties; + public Hashtable<String, String> getJmsProperties() { + return jmsProperties; + } + + public void addJmsProperties(Map<String, String> jmsProperties) { + this.jmsProperties.putAll(jmsProperties); } - public void setJndiProperties(Hashtable<String, String> jndiProperties) { - this.jndiProperties = jndiProperties; + public void removeJmsProperties(String key) { + this.jmsProperties.remove(key); } public Context getContext() { Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html?rev=724253&r1=724252&r2=724253&view=diff ============================================================================== --- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html (original) +++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html Sun Dec 7 20:48:21 2008 @@ -97,6 +97,9 @@ transport.UserTxnJNDIName transport.CacheUserTxn - true | false +transport.jms.UserName - user name to use when creating a new JMS Connection +transport.jms.Password - password to use when creating a new JMS Connection + transport.jms.PublishEPR - one or more EPR's could be specified. If none specified, defaults to