Modified: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java?rev=724432&r1=724431&r2=724432&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java (original) +++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java Mon Dec 8 10:15:40 2008 @@ -25,13 +25,14 @@ import org.apache.axis2.context.MessageContext; import org.apache.axis2.description.AxisService; import org.apache.axis2.description.Parameter; -import org.apache.axis2.description.ParameterIncludeImpl; import org.apache.axis2.format.TextMessageBuilder; import org.apache.axis2.format.TextMessageBuilderAdapter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.axis2.transport.TransportUtils; import org.apache.axis2.transport.base.BaseUtils; +import org.apache.axis2.transport.base.BaseConstants; +import org.apache.axis2.transport.base.threads.WorkerPool; import javax.jms.*; import javax.jms.Queue; @@ -55,58 +56,6 @@ private static final Object[] NOPARMS = new Object[] {}; /** - * Create a JMS Queue using the given connection with the JNDI destination name, and return the - * JMS Destination name of the created queue - * - * @param con the JMS Connection to be used - * @param destinationJNDIName the JNDI name of the Queue to be created - * @return the JMS Destination name of the created Queue - * @throws JMSException on error - */ - public static String createJMSQueue(Connection con, String destinationJNDIName) - throws JMSException { - - try { - QueueSession session - = ((QueueConnection) con).createQueueSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(destinationJNDIName); - log.info("JMS Queue with JNDI name : " + destinationJNDIName + " created"); - return queue.getQueueName(); - - } finally { - try { - con.close(); - } catch (JMSException ignore) {} - } - } - - /** - * Create a JMS Topic using the given connection with the JNDI destination name, and return the - * JMS Destination name of the created queue - * - * @param con the JMS Connection to be used - * @param destinationJNDIName the JNDI name of the Topic to be created - * @return the JMS Destination name of the created Topic - * @throws JMSException on error - */ - public static String createJMSTopic(Connection con, String destinationJNDIName) - throws JMSException { - - try { - TopicSession session - = ((TopicConnection) con).createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = session.createTopic(destinationJNDIName); - log.info("JMS Topic with JNDI name : " + destinationJNDIName + " created"); - return topic.getTopicName(); - - } finally { - try { - con.close(); - } catch (JMSException ignore) {} - } - } - - /** * Should this service be enabled over the JMS transport? * * @param service the Axis service @@ -131,26 +80,43 @@ * Get the EPR for the given JMS connection factory and destination * the form of the URL is * jms:/<destination>?[<key>=<value>&]* + * Credentials Context.SECURITY_PRINCIPAL, Context.SECURITY_CREDENTIALS + * JMSConstants.PARAM_JMS_USERNAME and JMSConstants.PARAM_JMS_USERNAME are filtered * - * @param cf the Axis2 JMS connection factory - * @param destination the JNDI name of the destination + * @param cf the Axis2 JMS connection factory + * @param destinationType the type of destination + * @param endpoint JMSEndpoint * @return the EPR as a String */ - // TODO: duplicate code (see JMSConnectionFactory#getEPRForDestination) - static String getEPR(JMSConnectionFactory cf, JMSEndpoint endpoint) { + static String getEPR(JMSConnectionFactory cf, int destinationType, JMSEndpoint endpoint) { StringBuffer sb = new StringBuffer(); - sb.append(JMSConstants.JMS_PREFIX).append(endpoint.getJndiDestinationName()); - sb.append("?").append(JMSConstants.DEST_PARAM_TYPE); - sb.append("=").append(endpoint.getDestinationType()); - for (Map.Entry<String,String> entry : cf.getJndiProperties().entrySet()) { - sb.append("&").append(entry.getKey()).append("=").append(entry.getValue()); - } - String contentTypeProperty = endpoint.getContentTypeRuleSet().getDefaultContentTypeProperty(); - if (contentTypeProperty != null) { - sb.append("&"); - sb.append(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); - sb.append("="); - sb.append(contentTypeProperty); + + sb.append( + JMSConstants.JMS_PREFIX).append(endpoint.getJndiDestinationName()); + sb.append("?"). + append(JMSConstants.PARAM_DEST_TYPE).append("=").append( + destinationType == JMSConstants.TOPIC ? + JMSConstants.DESTINATION_TYPE_TOPIC : JMSConstants.DESTINATION_TYPE_QUEUE); + + if (endpoint.getContentTypeRuleSet() != null) { + String contentTypeProperty = + endpoint.getContentTypeRuleSet().getDefaultContentTypeProperty(); + if (contentTypeProperty != null) { + sb.append("&"); + sb.append(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); + sb.append("="); + sb.append(contentTypeProperty); + } + } + + for (Map.Entry<String,String> entry : cf.getParameters().entrySet()) { + if (!Context.SECURITY_PRINCIPAL.equalsIgnoreCase(entry.getKey()) && + !Context.SECURITY_CREDENTIALS.equalsIgnoreCase(entry.getKey()) && + !JMSConstants.PARAM_JMS_USERNAME.equalsIgnoreCase(entry.getKey()) && + !JMSConstants.PARAM_JMS_PASSWORD.equalsIgnoreCase(entry.getKey())) { + sb.append("&").append( + entry.getKey()).append("=").append(entry.getValue()); + } } return sb.toString(); } @@ -188,59 +154,16 @@ } /** - * Set JNDI properties and any other connection factory parameters to the connection factory - * passed in, looking at the parameter in axis2.xml - * @param param the axis parameter that holds the connection factory settings - * @param jmsConFactory the JMS connection factory to which the parameters should be applied + * Set the SOAPEnvelope to the Axis2 MessageContext, from the JMS Message passed in + * @param message the JMS message read + * @param msgContext the Axis2 MessageContext to be populated + * @param contentType content type for the message + * @throws AxisFault + * @throws JMSException */ - public static void setConnectionFactoryParameters( - Parameter param, JMSConnectionFactory jmsConFactory) { - - ParameterIncludeImpl pi = new ParameterIncludeImpl(); - try { - pi.deserializeParameters((OMElement) param.getValue()); - } catch (AxisFault axisFault) { - log.error("Error reading parameters for JMS connection factory" + - jmsConFactory.getName(), axisFault); - } - - for (Object o : pi.getParameters()) { - - Parameter p = (Parameter) o; - - if (JMSConstants.CONFAC_TYPE.equals(p.getName())) { - String connectionFactoryType = (String) p.getValue(); - jmsConFactory.setConnectionFactoryType(connectionFactoryType); - - } else if (JMSConstants.RECONNECT_TIMEOUT.equals(p.getName())) { - String strTimeout = (String) p.getValue(); - int reconnectTimeoutSeconds = Integer.parseInt(strTimeout); - long reconnectTimeoutMillis = reconnectTimeoutSeconds * 1000; - jmsConFactory.setReconnectTimeout(reconnectTimeoutMillis); - - } else if (Context.INITIAL_CONTEXT_FACTORY.equals(p.getName())) { - jmsConFactory.addJNDIContextProperty( - Context.INITIAL_CONTEXT_FACTORY, (String) p.getValue()); - } else if (Context.PROVIDER_URL.equals(p.getName())) { - jmsConFactory.addJNDIContextProperty( - Context.PROVIDER_URL, (String) p.getValue()); - } else if (Context.SECURITY_PRINCIPAL.equals(p.getName())) { - jmsConFactory.addJNDIContextProperty( - Context.SECURITY_PRINCIPAL, (String) p.getValue()); - } else if (Context.SECURITY_CREDENTIALS.equals(p.getName())) { - jmsConFactory.addJNDIContextProperty( - Context.SECURITY_CREDENTIALS, (String) p.getValue()); - } else if (JMSConstants.CONFAC_JNDI_NAME_PARAM.equals(p.getName())) { - jmsConFactory.setConnFactoryJNDIName((String) p.getValue()); - jmsConFactory.addJNDIContextProperty( - JMSConstants.CONFAC_JNDI_NAME_PARAM, (String) p.getValue()); - } else { - jmsConFactory.addJNDIContextProperty( p.getName(), (String) p.getValue()); - } - } - } + public static void setSOAPEnvelope(Message message, MessageContext msgContext, String contentType) + throws AxisFault, JMSException { - public static void setSOAPEnvelope(Message message, MessageContext msgContext, String contentType) throws AxisFault, JMSException { if (message instanceof BytesMessage) { if (contentType == null) { log.debug("No content type specified; assuming application/octet-stream."); @@ -268,6 +191,7 @@ return; // Make compiler happy } msgContext.setEnvelope(envelope); + } else if (message instanceof TextMessage) { String type; if (contentType == null) { @@ -288,6 +212,7 @@ } builder = new SOAPBuilder(); } + TextMessageBuilder textMessageBuilder; if (builder instanceof TextMessageBuilder) { textMessageBuilder = (TextMessageBuilder)builder; @@ -312,6 +237,7 @@ */ public static Destination setReplyDestination(Destination replyDestination, Session session, Message message) { + if (replyDestination == null) { try { // create temporary queue to receive the reply @@ -340,116 +266,6 @@ } /** - * When trying to send a message to a destination, if it does not exist, try to create it - * - * @param destination the JMS destination to send messages - * @param destinationType type of the destination (can be a queue or a topic) - * @param targetAddress the target JMS EPR to find the Destination to be created if required - * @param session the JMS session to use - * @return the JMS Destination where messages could be posted - * @throws AxisFault if the target Destination does not exist and cannot be created - */ - public static Destination createDestinationIfRequired(Destination destination, - String destinationType, String targetAddress, Session session) throws AxisFault { - - if (destination == null) { - if (targetAddress != null) { - String name = JMSUtils.getDestination(targetAddress); - if (log.isDebugEnabled()) { - log.debug("Creating JMS Destination : " + name); - } - - try { - destination = createDestination(session, name, destinationType); - } catch (JMSException e) { - handleException("Error creating destination Queue : " + name, e); - } - } else { - handleException("Cannot send reply to null JMS Destination"); - } - } - return destination; - } - - /** - * If reply destination does not exist, try to create it - * - * @param destination the destination queue or topic - * @param replyDestinationName name of the reply destination queue or topic - * @param destinationType type of the destination (can be queue or topic) - * @param targetAddress target address of the queue or topic - * @param session JMS session with the message to be sent - * @return destination created if the destination is null or the destination otherwise - * @throws org.apache.axis2.AxisFault in case of an error in creating the destination - */ - public static Destination createReplyDestinationIfRequired(Destination destination, - String replyDestinationName, String destinationType, String targetAddress, Session session) - throws AxisFault { - - if (destination == null) { - if (targetAddress != null) { - if (log.isDebugEnabled()) { - log.debug("Creating JMS Reply Destination : " + replyDestinationName); - } - - try { - destination = createDestination(session, replyDestinationName, destinationType); - } catch (JMSException e) { - handleException("Error creating reply destination : " - + replyDestinationName, e); - } - } else { - handleException("Cannot send reply to null reply JMS Destination"); - } - } - return destination; - } - - /** - * Send the given message to the Destination using the given session - * - * @param session the session to use to send - * @param destination the Destination - * @param destinationType type of the destination (can be a queue or a topic) - * @param message the JMS Message - * @throws AxisFault on error - */ - public static void sendMessageToJMSDestination(Session session, - Destination destination, String destinationType, Message message) throws AxisFault { - - MessageProducer producer = null; - try { - if (log.isDebugEnabled()) { - log.debug("Sending message to destination : " + destination); - } - - if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType)) { - producer = ((TopicSession) session).createPublisher((Topic) destination); - ((TopicPublisher) producer).publish(message); - } else { - producer = ((QueueSession) session).createSender((Queue) destination); - producer.send(message); - } - - if (log.isDebugEnabled()) { - log.debug("Sent message to destination : " + destination + - "\nMessage ID : " + message.getJMSMessageID() + - "\nCorrelation ID : " + message.getJMSCorrelationID() + - "\nReplyTo ID : " + message.getJMSReplyTo()); - } - - } catch (JMSException e) { - handleException("Error creating a producer or sending to : " + destination, e); - } finally { - if (producer != null) { - try { - producer.close(); - } catch (JMSException ignore) {} - } - } - } - - /** * Set transport headers from the axis message context, into the JMS message * * @param msgContext the axis message context @@ -485,19 +301,21 @@ } else { log.warn("Invalid delivery mode ignored : " + o); } + } else if (JMSConstants.JMS_EXPIRATION.equals(name)) { message.setJMSExpiration( - Long.parseLong((String) headerMap.get(JMSConstants.JMS_EXPIRATION))); + Long.parseLong((String) headerMap.get(JMSConstants.JMS_EXPIRATION))); } else if (JMSConstants.JMS_MESSAGE_ID.equals(name)) { message.setJMSMessageID((String) headerMap.get(JMSConstants.JMS_MESSAGE_ID)); } else if (JMSConstants.JMS_PRIORITY.equals(name)) { message.setJMSPriority( - Integer.parseInt((String) headerMap.get(JMSConstants.JMS_PRIORITY))); + Integer.parseInt((String) headerMap.get(JMSConstants.JMS_PRIORITY))); } else if (JMSConstants.JMS_TIMESTAMP.equals(name)) { message.setJMSTimestamp( - Long.parseLong((String) headerMap.get(JMSConstants.JMS_TIMESTAMP))); + Long.parseLong((String) headerMap.get(JMSConstants.JMS_TIMESTAMP))); } else if (JMSConstants.JMS_MESSAGE_TYPE.equals(name)) { message.setJMSType((String) headerMap.get(JMSConstants.JMS_MESSAGE_TYPE)); + } else { Object value = headerMap.get(name); if (value instanceof String) { @@ -644,80 +462,14 @@ } - // ----------- JMS 1.0.2b compatibility methods ------------- - public static Connection createConnection(ConnectionFactory conFactory, String user, - String pass, String destinationType) throws JMSException { - - if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType) ) { - if (user != null && pass != null) { - return ((QueueConnectionFactory) conFactory).createQueueConnection(user, pass); - } else { - return ((QueueConnectionFactory) conFactory).createQueueConnection(); - } - - } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ) { - if (user != null && pass != null) { - return ((TopicConnectionFactory) conFactory).createTopicConnection(user, pass); - } else { - return ((TopicConnectionFactory) conFactory).createTopicConnection(); - } - } else { - handleException("Unable to determine type of JMS Connection Factory - i.e Queue/Topic"); - } - return null; - } - - public static Session createSession(Connection con, - boolean transacted, int acknowledgeMode, String destinationType) throws JMSException { - - if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType) ) { - return ((QueueConnection) con).createQueueSession(transacted, acknowledgeMode); - } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ) { - return ((TopicConnection) con).createTopicSession(transacted, acknowledgeMode); - } else { - log.debug("JMS destination type not given or invalid, was '" + destinationType + - "'. Taking the default value as queue"); - return ((QueueConnection) con).createQueueSession(transacted, acknowledgeMode); - } - } - - public static Destination createDestination(Session session, String destName, - String destinationType) throws JMSException { - - if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType)) { - return session.createQueue(destName); - } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ) { - return session.createTopic(destName); - } else { - log.debug("JMS destination type not given or invalid, was '" + destinationType + - "'. Taking the default value as queue"); - return session.createQueue(destName); - } - } - - public static void createDestination(ConnectionFactory conFactory, - String destinationJNDIName, String destinationType) throws JMSException { - - if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType)) { - JMSUtils.createJMSQueue( - ((QueueConnectionFactory) conFactory).createQueueConnection(), - destinationJNDIName); - } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ) { - JMSUtils.createJMSTopic( - ((TopicConnectionFactory) conFactory).createTopicConnection(), - destinationJNDIName); - } - } - public static MessageConsumer createConsumer(Session session, Destination dest) - throws JMSException { - - if (dest instanceof Queue) { - return ((QueueSession) session).createReceiver((Queue) dest); - } else { - return ((TopicSession) session).createSubscriber((Topic) dest); - } - } - + /** + * Create a MessageConsumer for the given Destination + * @param session JMS Session to use + * @param dest Destination for which the Consumer is to be created + * @param messageSelector the message selector to be used if any + * @return a MessageConsumer for the specified Destination + * @throws JMSException + */ public static MessageConsumer createConsumer(Session session, Destination dest, String messageSelector) throws JMSException { @@ -728,6 +480,13 @@ } } + /** + * Create a temp queue or topic for synchronous receipt of responses, when a reply destination + * is not specified + * @param session the JMS Session to use + * @return a temporary Queue or Topic, depending on the session + * @throws JMSException + */ public static Destination createTemporaryDestination(Session session) throws JMSException { if (session instanceof QueueSession) { @@ -737,6 +496,11 @@ } } + /** + * Return the body length in bytes for a bytes message + * @param bMsg the JMS BytesMessage + * @return length of body in bytes + */ public static long getBodyLength(BytesMessage bMsg) { try { Method mtd = bMsg.getClass().getMethod("getBodyLength", NOARGS); @@ -762,7 +526,13 @@ } catch (JMSException ignore) {} return length; } - + + /** + * Get the length of the message in bytes + * @param message + * @return message size (or approximation) in bytes + * @throws JMSException + */ public static long getMessageSize(Message message) throws JMSException { if (message instanceof BytesMessage) { return JMSUtils.getBodyLength((BytesMessage) message); @@ -799,4 +569,525 @@ } } } + + /** + * Create a ServiceTaskManager for the service passed in and its corresponding JMSConnectionFactory + * @param jcf + * @param service + * @param workerPool + * @return + */ + public static ServiceTaskManager createTaskManagerForService(JMSConnectionFactory jcf, + AxisService service, WorkerPool workerPool) { + + String name = service.getName(); + Map<String, String> svc = getServiceStringParameters(service.getParameters()); + Map<String, String> cf = jcf.getParameters(); + + ServiceTaskManager stm = new ServiceTaskManager(); + + stm.setServiceName(name); + stm.addJmsProperties(cf); + stm.addJmsProperties(svc); + + stm.setConnFactoryJNDIName( + getRqdStringProperty(JMSConstants.PARAM_CONFAC_JNDI_NAME, svc, cf)); + String destName = getOptionalStringProperty(JMSConstants.PARAM_DESTINATION, svc, cf); + if (destName == null) { + destName = service.getName(); + } + stm.setDestinationJNDIName(destName); + stm.setDestinationType(getDestinationType(svc, cf)); + + stm.setJmsSpec11( + getJMSSpecVersion(svc, cf)); + stm.setTransactionality( + getTransactionality(svc, cf)); + stm.setCacheUserTransaction( + getOptionalBooleanProperty(BaseConstants.PARAM_CACHE_USER_TXN, svc, cf)); + stm.setUserTransactionJNDIName( + getOptionalStringProperty(BaseConstants.PARAM_USER_TXN_JNDI_NAME, svc, cf)); + stm.setSessionTransacted( + getOptionalBooleanProperty(JMSConstants.PARAM_SESSION_TRANSACTED, svc, cf)); + stm.setSessionAckMode( + getSessionAck(svc, cf)); + stm.setMessageSelector( + getOptionalStringProperty(JMSConstants.PARAM_MSG_SELECTOR, svc, cf)); + stm.setSubscriptionDurable( + getOptionalBooleanProperty(JMSConstants.PARAM_SUB_DURABLE, svc, cf)); + stm.setDurableSubscriberName( + getOptionalStringProperty(JMSConstants.PARAM_DURABLE_SUB_NAME, svc, cf)); + + stm.setCacheLevel( + getCacheLevel(svc, cf)); + stm.setPubSubNoLocal( + getOptionalBooleanProperty(JMSConstants.PARAM_PUBSUB_NO_LOCAL, svc, cf)); + + Integer value = getOptionalIntProperty(JMSConstants.PARAM_RCV_TIMEOUT, svc, cf); + if (value != null) { + stm.setReceiveTimeout(value); + } + value = getOptionalIntProperty(JMSConstants.PARAM_CONCURRENT_CONSUMERS, svc, cf); + if (value != null) { + stm.setConcurrentConsumers(value); + } + value = getOptionalIntProperty(JMSConstants.PARAM_MAX_CONSUMERS, svc, cf); + if (value != null) { + stm.setMaxConcurrentConsumers(value); + } + value = getOptionalIntProperty(JMSConstants.PARAM_IDLE_TASK_LIMIT, svc, cf); + if (value != null) { + stm.setIdleTaskExecutionLimit(value); + } + value = getOptionalIntProperty(JMSConstants.PARAM_MAX_MSGS_PER_TASK, svc, cf); + if (value != null) { + stm.setMaxMessagesPerTask(value); + } + + value = getOptionalIntProperty(JMSConstants.PARAM_RECON_INIT_DURATION, svc, cf); + if (value != null) { + stm.setInitialReconnectDuration(value); + } + value = getOptionalIntProperty(JMSConstants.PARAM_RECON_MAX_DURATION, svc, cf); + if (value != null) { + stm.setMaxReconnectDuration(value); + } + Double dValue = getOptionalDoubleProperty(JMSConstants.PARAM_RECON_FACTOR, svc, cf); + if (dValue != null) { + stm.setReconnectionProgressionFactor(dValue); + } + + stm.setWorkerPool(workerPool); + + // remove processed properties from property bag + stm.removeJmsProperties(JMSConstants.PARAM_CONFAC_JNDI_NAME); + stm.removeJmsProperties(JMSConstants.PARAM_DESTINATION); + stm.removeJmsProperties(JMSConstants.PARAM_JMS_SPEC_VER); + stm.removeJmsProperties(BaseConstants.PARAM_TRANSACTIONALITY); + stm.removeJmsProperties(BaseConstants.PARAM_CACHE_USER_TXN); + stm.removeJmsProperties(BaseConstants.PARAM_USER_TXN_JNDI_NAME); + stm.removeJmsProperties(JMSConstants.PARAM_SESSION_TRANSACTED); + stm.removeJmsProperties(JMSConstants.PARAM_MSG_SELECTOR); + stm.removeJmsProperties(JMSConstants.PARAM_SUB_DURABLE); + stm.removeJmsProperties(JMSConstants.PARAM_DURABLE_SUB_NAME); + stm.removeJmsProperties(JMSConstants.PARAM_CACHE_LEVEL); + stm.removeJmsProperties(JMSConstants.PARAM_PUBSUB_NO_LOCAL); + stm.removeJmsProperties(JMSConstants.PARAM_RCV_TIMEOUT); + stm.removeJmsProperties(JMSConstants.PARAM_CONCURRENT_CONSUMERS); + stm.removeJmsProperties(JMSConstants.PARAM_MAX_CONSUMERS); + stm.removeJmsProperties(JMSConstants.PARAM_IDLE_TASK_LIMIT); + stm.removeJmsProperties(JMSConstants.PARAM_MAX_MSGS_PER_TASK); + stm.removeJmsProperties(JMSConstants.PARAM_RECON_INIT_DURATION); + stm.removeJmsProperties(JMSConstants.PARAM_RECON_MAX_DURATION); + stm.removeJmsProperties(JMSConstants.PARAM_RECON_FACTOR); + + return stm; + } + + private static Map<String, String> getServiceStringParameters(List list) { + + Map<String, String> map = new HashMap<String, String>(); + for (Object o : list) { + Parameter p = (Parameter) o; + if (p.getValue() instanceof String) { + map.put(p.getName(), (String) p.getValue()); + } + } + return map; + } + + private static String getRqdStringProperty(String key, Map svcMap, Map cfMap) { + String value = (String) svcMap.get(key); + if (value == null) { + value = (String) cfMap.get(key); + } + if (value == null) { + throw new AxisJMSException("Service/connection factory property : " + key); + } + return value; + } + + private static String getOptionalStringProperty(String key, Map svcMap, Map cfMap) { + String value = (String) svcMap.get(key); + if (value == null) { + value = (String) cfMap.get(key); + } + return value; + } + + private static Boolean getOptionalBooleanProperty(String key, Map svcMap, Map cfMap) { + String value = (String) svcMap.get(key); + if (value == null) { + value = (String) cfMap.get(key); + } + if (value == null) { + return null; + } else { + return Boolean.valueOf(value); + } + } + + private static Integer getOptionalIntProperty(String key, Map svcMap, Map cfMap) { + String value = (String) svcMap.get(key); + if (value == null) { + value = (String) cfMap.get(key); + } + if (value != null) { + try { + return Integer.parseInt(value); + } catch (NumberFormatException e) { + throw new AxisJMSException("Invalid value : " + value + " for " + key); + } + } + return null; + } + + private static Double getOptionalDoubleProperty(String key, Map svcMap, Map cfMap) { + String value = (String) svcMap.get(key); + if (value == null) { + value = (String) cfMap.get(key); + } + if (value != null) { + try { + return Double.parseDouble(value); + } catch (NumberFormatException e) { + throw new AxisJMSException("Invalid value : " + value + " for " + key); + } + } + return null; + } + + private static int getTransactionality(Map svcMap, Map cfMap) { + + String key = BaseConstants.PARAM_TRANSACTIONALITY; + String val = (String) svcMap.get(key); + if (val == null) { + val = (String) cfMap.get(key); + } + + if (val == null) { + return BaseConstants.TRANSACTION_NONE; + + } else { + if (BaseConstants.STR_TRANSACTION_JTA.equalsIgnoreCase(val)) { + return BaseConstants.TRANSACTION_JTA; + } else if (BaseConstants.STR_TRANSACTION_LOCAL.equalsIgnoreCase(val)) { + return BaseConstants.TRANSACTION_LOCAL; + } else { + throw new AxisJMSException("Invalid option : " + val + " for parameter : " + + BaseConstants.STR_TRANSACTION_JTA); + } + } + } + + private static int getDestinationType(Map svcMap, Map cfMap) { + + String key = JMSConstants.PARAM_DEST_TYPE; + String val = (String) svcMap.get(key); + if (val == null) { + val = (String) cfMap.get(key); + } + + if (JMSConstants.DESTINATION_TYPE_TOPIC.equalsIgnoreCase(val)) { + return JMSConstants.TOPIC; + } + return JMSConstants.QUEUE; + } + + private static int getSessionAck(Map svcMap, Map cfMap) { + + String key = JMSConstants.PARAM_SESSION_ACK; + String val = (String) svcMap.get(key); + if (val == null) { + val = (String) cfMap.get(key); + } + + if (val == null || "AUTO_ACKNOWLEDGE".equalsIgnoreCase(val)) { + return Session.AUTO_ACKNOWLEDGE; + } else if ("CLIENT_ACKNOWLEDGE".equalsIgnoreCase(val)) { + return Session.CLIENT_ACKNOWLEDGE; + } else if ("DUPS_OK_ACKNOWLEDGE".equals(val)){ + return Session.DUPS_OK_ACKNOWLEDGE; + } else if ("SESSION_TRANSACTED".equals(val)) { + return 0; //Session.SESSION_TRANSACTED; + } else { + try { + return Integer.parseInt(val); + } catch (NumberFormatException ignore) { + throw new AxisJMSException("Invalid session acknowledgement mode : " + val); + } + } + } + + private static int getCacheLevel(Map svcMap, Map cfMap) { + + String key = JMSConstants.PARAM_CACHE_LEVEL; + String val = (String) svcMap.get(key); + if (val == null) { + val = (String) cfMap.get(key); + } + + if ("none".equalsIgnoreCase(val)) { + return JMSConstants.CACHE_NONE; + } else if ("connection".equalsIgnoreCase(val)) { + return JMSConstants.CACHE_CONNECTION; + } else if ("session".equals(val)){ + return JMSConstants.CACHE_SESSION; + } else if ("consumer".equals(val)) { + return JMSConstants.CACHE_CONSUMER; + } else if (val != null) { + throw new AxisJMSException("Invalid cache level : " + val); + } + return JMSConstants.CACHE_AUTO; + } + + private static boolean getJMSSpecVersion(Map svcMap, Map cfMap) { + + String key = JMSConstants.PARAM_JMS_SPEC_VER; + String val = (String) svcMap.get(key); + if (val == null) { + val = (String) cfMap.get(key); + } + + if (val == null || "1.1".equals(val)) { + return true; + } else { + return false; + } + } + + /** + * This is a JMS spec independent method to create a Connection. Please be cautious when + * making any changes + * + * @param conFac the ConnectionFactory to use + * @param user optional user name + * @param pass optional password + * @param jmsSpec11 should we use JMS 1.1 API ? + * @param isQueue is this to deal with a Queue? + * @return a JMS Connection as requested + * @throws JMSException on errors, to be handled and logged by the caller + */ + public static Connection createConnection(ConnectionFactory conFac, + String user, String pass, boolean jmsSpec11, Boolean isQueue) throws JMSException { + + Connection connection = null; + if (log.isDebugEnabled()) { + log.debug("Creating a " + (isQueue == null ? "Generic" : isQueue ? "Queue" : "Topic") + + "Connection using credentials : (" + user + "/" + pass + ")"); + } + + if (jmsSpec11 || isQueue == null) { + if (user != null && pass != null) { + connection = conFac.createConnection(user, pass); + } else { + connection = conFac.createConnection(); + } + + } else { + QueueConnectionFactory qConFac = null; + TopicConnectionFactory tConFac = null; + if (isQueue) { + tConFac = (TopicConnectionFactory) conFac; + } else { + qConFac = (QueueConnectionFactory) conFac; + } + + if (user != null && pass != null) { + if (qConFac != null) { + connection = qConFac.createQueueConnection(user, pass); + } else if (tConFac != null) { + connection = tConFac.createTopicConnection(user, pass); + } + } else { + if (qConFac != null) { + connection = qConFac.createQueueConnection(); + } else if (tConFac != null) { + connection = tConFac.createTopicConnection(); + } + } + } + return connection; + } + + /** + * This is a JMS spec independent method to create a Session. Please be cautious when + * making any changes + * + * @param connection the JMS Connection + * @param transacted should the session be transacted? + * @param ackMode the ACK mode for the session + * @param jmsSpec11 should we use the JMS 1.1 API? + * @param isQueue is this Session to deal with a Queue? + * @return a Session created for the given information + * @throws JMSException on errors, to be handled and logged by the caller + */ + public static Session createSession(Connection connection, boolean transacted, int ackMode, + boolean jmsSpec11, Boolean isQueue) throws JMSException { + + if (jmsSpec11 || isQueue == null) { + return connection.createSession(transacted, ackMode); + + } else { + if (isQueue) { + return ((QueueConnection) connection).createQueueSession(transacted, ackMode); + } else { + return ((TopicConnection) connection).createTopicSession(transacted, ackMode); + } + } + } + + /** + * This is a JMS spec independent method to create a MessageConsumer. Please be cautious when + * making any changes + * + * @param session JMS session + * @param destination the Destination + * @param isQueue is the Destination a queue? + * @param subscriberName optional client name to use for a durable subscription to a topic + * @param messageSelector optional message selector + * @param pubSubNoLocal should we receive messages sent by us during pub-sub? + * @param isDurable is this a durable topic subscription? + * @param jmsSpec11 should we use JMS 1.1 API ? + * @return a MessageConsumer to receive messages + * @throws JMSException on errors, to be handled and logged by the caller + */ + public static MessageConsumer createConsumer( + Session session, Destination destination, Boolean isQueue, + String subscriberName, String messageSelector, boolean pubSubNoLocal, + boolean isDurable, boolean jmsSpec11) throws JMSException { + + if (jmsSpec11 || isQueue == null) { + if (isDurable) { + return session.createDurableSubscriber( + (Topic) destination, subscriberName, messageSelector, pubSubNoLocal); + } else { + return session.createConsumer(destination, messageSelector, pubSubNoLocal); + } + } else { + if (isQueue) { + return ((QueueSession) session).createReceiver((Queue) destination, messageSelector); + } else { + if (isDurable) { + return ((TopicSession) session).createDurableSubscriber( + (Topic) destination, subscriberName, messageSelector, pubSubNoLocal); + } else { + return ((TopicSession) session).createSubscriber( + (Topic) destination, messageSelector, pubSubNoLocal); + } + } + } + } + + /** + * This is a JMS spec independent method to create a MessageProducer. Please be cautious when + * making any changes + * + * @param session JMS session + * @param destination the Destination + * @param isQueue is the Destination a queue? + * @param jmsSpec11 should we use JMS 1.1 API ? + * @return a MessageProducer to send messages to the given Destination + * @throws JMSException on errors, to be handled and logged by the caller + */ + public static MessageProducer createProducer( + Session session, Destination destination, Boolean isQueue, boolean jmsSpec11) throws JMSException { + + if (jmsSpec11 || isQueue == null) { + return session.createProducer(destination); + } else { + if (isQueue) { + return ((QueueSession) session).createSender((Queue) destination); + } else { + return ((TopicSession) session).createPublisher((Topic) destination); + } + } + } + + /** + * Create a one time MessageProducer for the given JMS OutTransport information + * For simplicity and best compatibility, this method uses only JMS 1.0.2b API. + * Please be cautious when making any changes + * + * @param jmsOut the JMS OutTransport information (contains all properties) + * @return a JMSSender based on one-time use resources + * @throws JMSException on errors, to be handled and logged by the caller + */ + public static JMSMessageSender createJMSSender(JMSOutTransportInfo jmsOut) + throws JMSException { + + // digest the targetAddress and locate CF from the EPR + jmsOut.loadConnectionFactoryFromProperies(); + + // create a one time connection and session to be used + Hashtable<String,String> jmsProps = jmsOut.getProperties(); + String user = jmsProps != null ? jmsProps.get(JMSConstants.PARAM_JMS_USERNAME) : null; + String pass = jmsProps != null ? jmsProps.get(JMSConstants.PARAM_JMS_PASSWORD) : null; + + QueueConnectionFactory qConFac = null; + TopicConnectionFactory tConFac = null; + + int destType = -1; + if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(jmsOut.getDestinationType())) { + destType = JMSConstants.QUEUE; + qConFac = (QueueConnectionFactory) jmsOut.getConnectionFactory(); + + } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(jmsOut.getDestinationType())) { + destType = JMSConstants.TOPIC; + tConFac = (TopicConnectionFactory) jmsOut.getConnectionFactory(); + } + + Connection connection = null; + if (user != null && pass != null) { + if (qConFac != null) { + connection = qConFac.createQueueConnection(user, pass); + } else if (tConFac != null) { + connection = tConFac.createTopicConnection(user, pass); + } + } else { + if (qConFac != null) { + connection = qConFac.createQueueConnection(); + } else if (tConFac != null) { + connection = tConFac.createTopicConnection(); + } + } + + if (connection == null && jmsOut.getJmsConnectionFactory() != null) { + connection = jmsOut.getJmsConnectionFactory().getConnection(); + } + + Session session = null; + MessageProducer producer = null; + Destination destination = jmsOut.getDestination(); + + if (destType == JMSConstants.QUEUE) { + session = ((QueueConnection) connection). + createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + producer = ((QueueSession) session).createSender((Queue) destination); + } else { + session = ((TopicConnection) connection). + createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + producer = ((TopicSession) session).createPublisher((Topic) destination); + } + + return new JMSMessageSender(connection, session, producer, + destination, (jmsOut.getJmsConnectionFactory() == null ? + JMSConstants.CACHE_NONE : jmsOut.getJmsConnectionFactory().getCacheLevel()), false, + destType == -1 ? null : destType == JMSConstants.QUEUE ? Boolean.TRUE : Boolean.FALSE); + } + + /** + * Return a String representation of the destination type + * @param destType the destination type indicator int + * @return a descriptive String + */ + public static String getDestinationTypeAsString(int destType) { + if (destType == JMSConstants.QUEUE) { + return "Queue"; + } else if (destType == JMSConstants.TOPIC) { + return "Topic"; + } else { + return "Generic"; + } + } }