Modified: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java?rev=724432&r1=724431&r2=724432&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java (original) +++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java Mon Dec 8 10:15:40 2008 @@ -17,7 +17,6 @@ import org.apache.axis2.AxisFault; import org.apache.axis2.Constants; -import org.apache.axis2.transport.base.threads.WorkerPool; import org.apache.axis2.transport.base.BaseUtils; import org.apache.axis2.transport.base.BaseConstants; import org.apache.axis2.transport.base.MetricsCollector; @@ -25,73 +24,76 @@ import org.apache.axis2.description.Parameter; import org.apache.axis2.description.AxisService; import org.apache.axis2.description.AxisOperation; -import org.apache.axis2.context.ConfigurationContext; import org.apache.axis2.context.MessageContext; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import javax.jms.*; import javax.xml.namespace.QName; +import javax.transaction.UserTransaction; /** - * This is the actual receiver which listens for and accepts JMS messages, and - * hands them over to be processed by a worker thread. An instance of this - * class is created for each JMSConnectionFactory, but all instances may and - * will share the same worker thread pool held by the JMSListener + * This is the JMS message receiver which is invoked when a message is received. This processes + * the message through the engine */ -public class JMSMessageReceiver implements MessageListener { +public class JMSMessageReceiver { private static final Log log = LogFactory.getLog(JMSMessageReceiver.class); /** The JMSListener */ private JMSListener jmsListener = null; - /** The thread pool of workers */ - private WorkerPool workerPool = null; - /** The Axis configuration context */ - private ConfigurationContext cfgCtx = null; - /** A reference to the JMS Connection Factory to which this applies */ + /** A reference to the JMS Connection Factory */ private JMSConnectionFactory jmsConnectionFactory = null; - /** The endpoint this message receiver is bound to. */ - final JMSEndpoint endpoint; - /** Metrics collector */ + /** The JMS metrics collector */ private MetricsCollector metrics = null; + /** The endpoint this message receiver is bound to */ + final JMSEndpoint endpoint; /** * Create a new JMSMessage receiver * * @param jmsListener the JMS transport Listener - * @param jmsConFac the JMS connection factory we are associated with - * @param workerPool the worker thread pool to be used - * @param cfgCtx the axis ConfigurationContext + * @param jmsConFac the JMS connection factory we are associated with + * @param workerPool the worker thread pool to be used + * @param cfgCtx the axis ConfigurationContext * @param serviceName the name of the Axis service + * @param endpoint the JMSEndpoint definition to be used */ - JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac, - WorkerPool workerPool, ConfigurationContext cfgCtx, JMSEndpoint endpoint) { + JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac, JMSEndpoint endpoint) { this.jmsListener = jmsListener; this.jmsConnectionFactory = jmsConFac; - this.workerPool = workerPool; - this.cfgCtx = cfgCtx; this.endpoint = endpoint; this.metrics = jmsListener.getMetricsCollector(); } /** - * The entry point on the reception of each JMS message + * Process a new message received * * @param message the JMS message received + * @param ut UserTransaction which was used to receive the message + * @return true if caller should commit */ - public void onMessage(Message message) { - // directly create a new worker and delegate processing + public boolean onMessage(Message message, UserTransaction ut) { + try { if (log.isDebugEnabled()) { StringBuffer sb = new StringBuffer(); - sb.append("Received JMS message to destination : " + message.getJMSDestination()); - sb.append("\nMessage ID : " + message.getJMSMessageID()); - sb.append("\nCorrelation ID : " + message.getJMSCorrelationID()); - sb.append("\nReplyTo ID : " + message.getJMSReplyTo()); + sb.append("Received new JMS message for service :").append(endpoint.getServiceName()); + sb.append("\nDestination : ").append(message.getJMSDestination()); + sb.append("\nMessage ID : ").append(message.getJMSMessageID()); + sb.append("\nCorrelation ID : ").append(message.getJMSCorrelationID()); + sb.append("\nReplyTo : ").append(message.getJMSReplyTo()); + sb.append("\nRedelivery ? : ").append(message.getJMSRedelivered()); + sb.append("\nPriority : ").append(message.getJMSPriority()); + sb.append("\nExpiration : ").append(message.getJMSExpiration()); + sb.append("\nTimestamp : ").append(message.getJMSTimestamp()); + sb.append("\nMessage Type : ").append(message.getJMSType()); + sb.append("\nPersistent ? : ").append( + DeliveryMode.PERSISTENT == message.getJMSDeliveryMode()); + log.debug(sb.toString()); if (log.isTraceEnabled() && message instanceof TextMessage) { - log.trace("\nMessage : " + ((TextMessage) message).getText()); + log.trace("\nMessage : " + ((TextMessage) message).getText()); } } } catch (JMSException e) { @@ -109,112 +111,123 @@ // has this message already expired? expiration time == 0 means never expires try { - long expiryTime = message.getJMSExpiration(); + long expiryTime = message.getJMSExpiration(); if (expiryTime > 0 && System.currentTimeMillis() > expiryTime) { if (log.isDebugEnabled()) { log.debug("Discard expired message with ID : " + message.getJMSMessageID()); } - return; + return true; } } catch (JMSException ignore) {} - workerPool.execute(new Worker(message)); - } - private void handleException(String msg, Exception e) { - log.error(msg, e); - throw new AxisJMSException(msg, e); - } + boolean successful = false; + try { + successful = processThoughEngine(message, ut); - private void handleException(String msg) { - log.error(msg); - throw new AxisJMSException(msg); - } + } catch (JMSException e) { + log.error("JMS Exception encountered while processing", e); + } catch (AxisFault e) { + log.error("Axis fault processing message", e); + } catch (Exception e) { + log.error("Unknown error processing message", e); + } finally { + if (successful) { + metrics.incrementMessagesReceived(); + } else { + metrics.incrementFaultsReceiving(); + } + } + + return successful; + } /** - * The actual Worker implementation which will process the - * received JMS messages in the worker thread pool + * Process the new message through Axis2 + * + * @param message the JMS message + * @param ut the UserTransaction used for receipt + * @return true if the caller should commit + * @throws JMSException, on JMS exceptions + * @throws AxisFault on Axis2 errors */ - class Worker implements Runnable { + private boolean processThoughEngine(Message message, UserTransaction ut) + throws JMSException, AxisFault { - private Message message = null; + MessageContext msgContext = jmsListener.createMessageContext(); - Worker(Message message) { - this.message = message; - } + // set the JMS Message ID as the Message ID of the MessageContext + try { + msgContext.setMessageID(message.getJMSMessageID()); + msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, message.getJMSMessageID()); + } catch (JMSException ignore) {} - public void run() { + String soapAction = JMSUtils.getProperty(message, BaseConstants.SOAPACTION); - MessageContext msgContext = jmsListener.createMessageContext(); + AxisService service = endpoint.getService(); + msgContext.setAxisService(service); - // set the JMS Message ID as the Message ID of the MessageContext - try { - msgContext.setMessageID(message.getJMSMessageID()); - msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, message.getJMSMessageID()); - } catch (JMSException ignore) {} + // find the operation for the message, or default to one + Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM); + QName operationQName = ( + operationParam != null ? + BaseUtils.getQNameFromString(operationParam.getValue()) : + BaseConstants.DEFAULT_OPERATION); + + AxisOperation operation = service.getOperation(operationQName); + if (operation != null) { + msgContext.setAxisOperation(operation); + msgContext.setSoapAction("urn:" + operation.getName().getLocalPart()); + } - AxisService service = null; - try { - String soapAction = JMSUtils. - getProperty(message, BaseConstants.SOAPACTION); + ContentTypeInfo contentTypeInfo = + endpoint.getContentTypeRuleSet().getContentTypeInfo(message); + if (contentTypeInfo == null) { + throw new AxisFault("Unable to determine content type for message " + + msgContext.getMessageID()); + } - service = endpoint.getService(); - msgContext.setAxisService(service); + // set the message property OUT_TRANSPORT_INFO + // the reply is assumed to be over the JMSReplyTo destination, using + // the same incoming connection factory, if a JMSReplyTo is available + Destination replyTo = message.getJMSReplyTo(); + if (replyTo == null) { + // does the service specify a default reply destination ? + Parameter param = service.getParameter(JMSConstants.PARAM_REPLY_DESTINATION); + if (param != null && param.getValue() != null) { + replyTo = jmsConnectionFactory.getDestination((String) param.getValue()); + } - // find the operation for the message, or default to one - Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM); - QName operationQName = ( - operationParam != null ? - BaseUtils.getQNameFromString(operationParam.getValue()) : - BaseConstants.DEFAULT_OPERATION); + } + if (replyTo != null) { + msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, + new JMSOutTransportInfo(jmsConnectionFactory, replyTo, + contentTypeInfo.getPropertyName())); + } - AxisOperation operation = service.getOperation(operationQName); - if (operation != null) { - msgContext.setAxisOperation(operation); - msgContext.setSoapAction("urn:" + operation.getName().getLocalPart()); - } + JMSUtils.setSOAPEnvelope(message, msgContext, contentTypeInfo.getContentType()); + if (ut != null) { + msgContext.setProperty(BaseConstants.USER_TRANSACTION, ut); + } - ContentTypeInfo contentTypeInfo = - endpoint.getContentTypeRuleSet().getContentTypeInfo(message); - if (contentTypeInfo == null) { - throw new AxisFault("Unable to determine content type for message " + - msgContext.getMessageID()); - } - - // set the message property OUT_TRANSPORT_INFO - // the reply is assumed to be over the JMSReplyTo destination, using - // the same incoming connection factory, if a JMSReplyTo is available - Destination replyTo = message.getJMSReplyTo(); - if (replyTo == null) { - // does the service specify a default reply destination ? - Parameter param = service.getParameter(JMSConstants.REPLY_PARAM); - if (param != null && param.getValue() != null) { - replyTo = jmsConnectionFactory.getDestination((String) param.getValue()); - } - - } - if (replyTo != null) { - msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, - new JMSOutTransportInfo(jmsConnectionFactory, replyTo, - contentTypeInfo.getPropertyName())); + try { + jmsListener.handleIncomingMessage( + msgContext, + JMSUtils.getTransportHeaders(message), + soapAction, + contentTypeInfo.getContentType()); + + } finally { + + Object o = msgContext.getProperty(BaseConstants.SET_ROLLBACK_ONLY); + if (o != null) { + if ((o instanceof Boolean && ((Boolean) o)) || + (o instanceof String && Boolean.valueOf((String) o))) { + return false; } - - JMSUtils.setSOAPEnvelope(message, msgContext, contentTypeInfo.getContentType()); - - jmsListener.handleIncomingMessage( - msgContext, - JMSUtils.getTransportHeaders(message), - soapAction, - contentTypeInfo.getContentType() - ); - metrics.incrementMessagesReceived(); - - } catch (Throwable e) { - metrics.incrementFaultsReceiving(); - jmsListener.error(service, e); - log.error("Exception while processing incoming message", e); } + return true; } } }
Added: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java?rev=724432&view=auto ============================================================================== --- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java (added) +++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java Mon Dec 8 10:15:40 2008 @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.axis2.transport.jms; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.axis2.context.MessageContext; +import org.apache.axis2.transport.base.BaseConstants; + +import javax.jms.*; +import javax.transaction.*; + +/** + * Performs the actual sending of a JMS message, and the subsequent committing of a JTA transaction + * (if requested) or the local session transaction, if used. An instance of this class is unique + * to a single message send out operation and will not be shared. + */ +public class JMSMessageSender { + + private static final Log log = LogFactory.getLog(JMSMessageSender.class); + + /** The Connection to be used to send out */ + private Connection connection = null; + /** The Session to be used to send out */ + private Session session = null; + /** The MessageProducer used */ + private MessageProducer producer = null; + /** Target Destination */ + private Destination destination = null; + /** The level of cachability for resources */ + private int cacheLevel = JMSConstants.CACHE_CONNECTION; + /** Should this sender use JMS 1.1 ? (if false, defaults to 1.0.2b) */ + private boolean jmsSpec11 = true; + /** Are we sending to a Queue ? */ + private Boolean isQueue = null; + + /** + * This is a low-end method to support the one-time sends using JMS 1.0.2b + * @param connection the JMS Connection + * @param session JMS Session + * @param producer the MessageProducer + * @param destination the JMS Destination + * @param cacheLevel cacheLevel - None | Connection | Session | Producer + * @param jmsSpec11 true if the JMS 1.1 API should be used + * @param isQueue posting to a Queue? + */ + public JMSMessageSender(Connection connection, Session session, MessageProducer producer, + Destination destination, int cacheLevel, boolean jmsSpec11, Boolean isQueue) { + + this.connection = connection; + this.session = session; + this.producer = producer; + this.destination = destination; + this.cacheLevel = cacheLevel; + this.jmsSpec11 = jmsSpec11; + this.isQueue = isQueue; + } + + /** + * Create a JMSSender using a JMSConnectionFactory and target EPR + * + * @param jmsConnectionFactory the JMSConnectionFactory + * @param targetAddress target EPR + */ + public JMSMessageSender(JMSConnectionFactory jmsConnectionFactory, String targetAddress) { + + if (jmsConnectionFactory != null) { + this.cacheLevel = jmsConnectionFactory.getCacheLevel(); + this.jmsSpec11 = jmsConnectionFactory.isJmsSpec11(); + this.connection = jmsConnectionFactory.getConnection(); + this.session = jmsConnectionFactory.getSession(connection); + this.destination = + jmsConnectionFactory.getSharedDestination() == null ? + jmsConnectionFactory.getDestination(JMSUtils.getDestination(targetAddress)) : + jmsConnectionFactory.getSharedDestination(); + this.producer = jmsConnectionFactory.getMessageProducer(connection, session, destination); + + } else { + JMSOutTransportInfo jmsOut = new JMSOutTransportInfo(targetAddress); + jmsOut.loadConnectionFactoryFromProperies(); + } + } + + /** + * Perform actual send of JMS message to the Destination selected + * + * @param message the JMS message + * @param msgCtx the Axis2 MessageContext + */ + public void send(Message message, MessageContext msgCtx) { + + Boolean jtaCommit = getBooleanProperty(msgCtx, BaseConstants.JTA_COMMIT_AFTER_SEND); + Boolean rollbackOnly = getBooleanProperty(msgCtx, BaseConstants.SET_ROLLBACK_ONLY); + Boolean persistent = getBooleanProperty(msgCtx, JMSConstants.JMS_DELIVERY_MODE); + Integer priority = getIntegerProperty(msgCtx, JMSConstants.JMS_PRIORITY); + Integer timeToLive = getIntegerProperty(msgCtx, JMSConstants.JMS_TIME_TO_LIVE); + + // Do not commit, if message is marked for rollback + if (rollbackOnly != null && rollbackOnly) { + jtaCommit = Boolean.FALSE; + } + + if (persistent != null) { + try { + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + } catch (JMSException e) { + handleException("Error setting JMS Producer for PERSISTENT delivery", e); + } + } + if (priority != null) { + try { + producer.setPriority(priority); + } catch (JMSException e) { + handleException("Error setting JMS Producer priority to : " + priority, e); + } + } + if (timeToLive != null) { + try { + producer.setTimeToLive(timeToLive); + } catch (JMSException e) { + handleException("Error setting JMS Producer TTL to : " + timeToLive, e); + } + } + + boolean sendingSuccessful = false; + // perform actual message sending + try { + if (jmsSpec11 || isQueue == null) { + producer.send(message); + + } else { + if (isQueue) { + ((QueueSender) producer).send(message); + + } else { + ((TopicPublisher) producer).publish(message); + } + } + + // set the actual MessageID to the message context for use by any others down the line + String msgId = null; + try { + msgId = message.getJMSMessageID(); + if (msgId != null) { + msgCtx.setProperty(JMSConstants.JMS_MESSAGE_ID, msgId); + } + } catch (JMSException ignore) {} + + sendingSuccessful = true; + + if (log.isDebugEnabled()) { + log.debug("Sent Message Context ID : " + msgCtx.getMessageID() + + " with JMS Message ID : " + msgId + + " to destination : " + producer.getDestination()); + } + + } catch (JMSException e) { + log.error("Error sending message with MessageContext ID : " + + msgCtx.getMessageID() + " to destination : " + destination, e); + + } finally { + + if (jtaCommit != null) { + + UserTransaction ut = (UserTransaction) msgCtx.getProperty(BaseConstants.USER_TRANSACTION); + if (ut != null) { + + try { + if (sendingSuccessful && jtaCommit) { + ut.commit(); + } else { + ut.rollback(); + } + msgCtx.removeProperty(BaseConstants.USER_TRANSACTION); + + if (log.isDebugEnabled()) { + log.debug((sendingSuccessful ? "Committed" : "Rolled back") + + " JTA Transaction"); + } + + } catch (Exception e) { + handleException("Error committing/rolling back JTA transaction after " + + "sending of message with MessageContext ID : " + msgCtx.getMessageID() + + " to destination : " + destination, e); + } + } + + } else { + try { + if (session.getTransacted()) { + if (sendingSuccessful && (rollbackOnly == null || !rollbackOnly)) { + session.commit(); + } else { + session.rollback(); + } + } + + if (log.isDebugEnabled()) { + log.debug((sendingSuccessful ? "Committed" : "Rolled back") + + " local (JMS Session) Transaction"); + } + + } catch (JMSException e) { + handleException("Error committing/rolling back local (i.e. session) " + + "transaction after sending of message with MessageContext ID : " + + msgCtx.getMessageID() + " to destination : " + destination, e); + } + } + } + } + + /** + * Close non-shared producer, session and connection if any + */ + public void close() { + if (producer != null && cacheLevel < JMSConstants.CACHE_PRODUCER) { + try { + producer.close(); + } catch (JMSException e) { + log.error("Error closing JMS MessageProducer after send", e); + } finally { + producer = null; + } + } + + if (session != null && cacheLevel < JMSConstants.CACHE_SESSION) { + try { + session.close(); + } catch (JMSException e) { + log.error("Error closing JMS Session after send", e); + } finally { + session = null; + } + } + + if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) { + try { + connection.close(); + } catch (JMSException e) { + log.error("Error closing JMS Connection after send", e); + } finally { + connection = null; + } + } + } + + private void handleException(String message, Exception e) { + log.error(message, e); + throw new AxisJMSException(message, e); + } + + private Boolean getBooleanProperty(MessageContext msgCtx, String name) { + Object o = msgCtx.getProperty(name); + if (o != null) { + if (o instanceof Boolean) { + return (Boolean) o; + } else if (o instanceof String) { + return Boolean.valueOf((String) o); + } + } + return null; + } + + private Integer getIntegerProperty(MessageContext msgCtx, String name) { + Object o = msgCtx.getProperty(name); + if (o != null) { + if (o instanceof Integer) { + return (Integer) o; + } else if (o instanceof String) { + return Integer.parseInt((String) o); + } + } + return null; + } + + public void setConnection(Connection connection) { + this.connection = connection; + } + + public void setSession(Session session) { + this.session = session; + } + + public void setProducer(MessageProducer producer) { + this.producer = producer; + } + + public void setCacheLevel(int cacheLevel) { + this.cacheLevel = cacheLevel; + } + + public int getCacheLevel() { + return cacheLevel; + } + + public Connection getConnection() { + return connection; + } + + public MessageProducer getProducer() { + return producer; + } + + public Session getSession() { + return session; + } +} Modified: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java?rev=724432&r1=724431&r2=724432&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java (original) +++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java Mon Dec 8 10:15:40 2008 @@ -41,8 +41,8 @@ /** The naming context */ private Context context; /** - * this is a reference to the underlying JMS connection factory when sending messages - * through connection factories not defined to the transport sender + * this is a reference to the underlying JMS ConnectionFactory when sending messages + * through connection factories not defined at the TransportSender level */ private ConnectionFactory connectionFactory = null; /** @@ -53,14 +53,18 @@ private JMSConnectionFactory jmsConnectionFactory = null; /** the Destination queue or topic for the outgoing message */ private Destination destination = null; - /** the Destination queue or topic for the outgoing message i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC */ - private String destinationType = JMSConstants.DESTINATION_TYPE_QUEUE; + /** the Destination queue or topic for the outgoing message + * i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC or DESTINATION_TYPE_GENERIC + */ + private String destinationType = JMSConstants.DESTINATION_TYPE_GENERIC; /** the Reply Destination queue or topic for the outgoing message */ private Destination replyDestination = null; /** the Reply Destination name */ private String replyDestinationName = null; - /** the Reply Destination queue or topic for the outgoing message i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC */ - private String replyDestinationType = JMSConstants.DESTINATION_TYPE_QUEUE; + /** the Reply Destination queue or topic for the outgoing message + * i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC or DESTINATION_TYPE_GENERIC + */ + private String replyDestinationType = JMSConstants.DESTINATION_TYPE_GENERIC; /** the EPR properties when the out-transport info is generated from a target EPR */ private Hashtable<String,String> properties = null; /** the target EPR string where applicable */ @@ -68,13 +72,12 @@ /** the message property name that stores the content type of the outgoing message */ private String contentTypeProperty; - private String contentType = null; - /** * Creates an instance using the given JMS connection factory and destination * * @param jmsConnectionFactory the JMS connection factory * @param dest the destination + * @param contentTypeProperty */ JMSOutTransportInfo(JMSConnectionFactory jmsConnectionFactory, Destination dest, String contentTypeProperty) { @@ -91,26 +94,31 @@ * @param targetEPR the target EPR */ JMSOutTransportInfo(String targetEPR) { + this.targetEPR = targetEPR; if (!targetEPR.startsWith(JMSConstants.JMS_PREFIX)) { handleException("Invalid prefix for a JMS EPR : " + targetEPR); + } else { properties = BaseUtils.getEPRProperties(targetEPR); - String destinationType = properties.get(JMSConstants.DEST_PARAM_TYPE); - if(destinationType != null) { + String destinationType = properties.get(JMSConstants.PARAM_DEST_TYPE); + if (destinationType != null) { setDestinationType(destinationType); } - String replyDestinationType = properties.get(JMSConstants.REPLY_PARAM_TYPE); - if(replyDestinationType != null) { + + String replyDestinationType = properties.get(JMSConstants.PARAM_REPLY_DEST_TYPE); + if (replyDestinationType != null) { setReplyDestinationType(replyDestinationType); } - replyDestinationName = properties.get(JMSConstants.REPLY_PARAM); + + replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION); contentTypeProperty = properties.get(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); try { context = new InitialContext(properties); } catch (NamingException e) { handleException("Could not get an initial context using " + properties, e); } + destination = getDestination(context, targetEPR); replyDestination = getReplyDestination(context, targetEPR); } @@ -136,7 +144,7 @@ private ConnectionFactory getConnectionFactory(Context context, Hashtable<String,String> props) { try { - String conFacJndiName = props.get(JMSConstants.CONFAC_JNDI_NAME_PARAM); + String conFacJndiName = props.get(JMSConstants.PARAM_CONFAC_JNDI_NAME); if (conFacJndiName != null) { return JMSUtils.lookup(context, ConnectionFactory.class, conFacJndiName); } else { @@ -160,8 +168,12 @@ try { return JMSUtils.lookup(context, Destination.class, destinationName); } catch (NameNotFoundException e) { - if (log.isDebugEnabled()) { - log.debug("Cannot locate destination : " + destinationName + " using " + url); + try { + return JMSUtils.lookup(context, Destination.class, + (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ? + "dynamicTopics/" : "dynamicQueues/") + destinationName); + } catch (NamingException x) { + handleException("Cannot locate destination : " + destinationName + " using " + url); } } catch (NamingException e) { handleException("Cannot locate destination : " + destinationName + " using " + url, e); @@ -177,10 +189,11 @@ * @return the JMS destination, or null if it does not exist */ private Destination getReplyDestination(Context context, String url) { - String replyDestinationName = properties.get(JMSConstants.REPLY_PARAM); + String replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION); if(replyDestinationName == null) { return null; } + try { return JMSUtils.lookup(context, Destination.class, replyDestinationName); } catch (NameNotFoundException e) { @@ -190,13 +203,14 @@ } catch (NamingException e) { handleException("Cannot locate destination : " + replyDestinationName + " using " + url, e); } + return null; } /** * Look up for the given destination - * @param replyDest - * @return + * @param replyDest the JNDI name to lookup Destination required + * @return Destination for the JNDI name passed */ public Destination getReplyDestination(String replyDest) { try { @@ -236,7 +250,7 @@ } public void setContentType(String contentType) { - this.contentType = contentType; + // this is a useless Axis2 method imposed by the OutTransportInfo interface :( } public Hashtable<String,String> getProperties() { Modified: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java?rev=724432&r1=724431&r2=724432&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java (original) +++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java Mon Dec 8 10:15:40 2008 @@ -20,6 +20,7 @@ import org.apache.axiom.om.OMText; import org.apache.axiom.om.OMNode; import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; import org.apache.axis2.context.MessageContext; import org.apache.axis2.context.ConfigurationContext; import org.apache.axis2.description.TransportOutDescription; @@ -29,11 +30,9 @@ import org.apache.axis2.transport.base.*; import org.apache.axis2.transport.base.streams.WriterOutputStream; import org.apache.axis2.transport.http.HTTPConstants; -import org.apache.commons.logging.LogFactory; import javax.jms.*; import javax.activation.DataHandler; -import javax.naming.Context; import java.io.IOException; import java.io.OutputStream; import java.io.StringWriter; @@ -45,34 +44,23 @@ */ public class JMSSender extends AbstractTransportSender implements ManagementSupport { - public static final String TRANSPORT_NAME = "jms"; - - private JMSConnectionFactoryManager connFacManager; + public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS; - public JMSSender() { - log = LogFactory.getLog(JMSSender.class); - } + /** The JMS connection factory manager to be used when sending messages out */ + private JMSConnectionFactoryManager connFacManager; /** * Initialize the transport sender by reading pre-defined connection factories for - * outgoing messages. These will create sessions (one per each destination dealth with) - * to be used when messages are being sent. + * outgoing messages. + * * @param cfgCtx the configuration context * @param transportOut the transport sender definition from axis2.xml * @throws AxisFault on error */ public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault { super.init(cfgCtx, transportOut); - connFacManager = new JMSConnectionFactoryManager(cfgCtx); - // read the connection factory definitions and create them - connFacManager.loadConnectionFactoryDefinitions(transportOut); - connFacManager.start(); - } - - @Override - public void stop() { - connFacManager.stop(); - super.stop(); + connFacManager = new JMSConnectionFactoryManager(transportOut); + log.info("JMS Transport Sender initialized..."); } /** @@ -84,9 +72,9 @@ */ private JMSConnectionFactory getJMSConnectionFactory(JMSOutTransportInfo trpInfo) { Map<String,String> props = trpInfo.getProperties(); - if(trpInfo.getProperties() != null) { - String jmsConnectionFactoryName = props.get(JMSConstants.CONFAC_PARAM); - if(jmsConnectionFactoryName != null) { + if (trpInfo.getProperties() != null) { + String jmsConnectionFactoryName = props.get(JMSConstants.PARAM_JMS_CONFAC); + if (jmsConnectionFactoryName != null) { return connFacManager.getJMSConnectionFactory(jmsConnectionFactoryName); } else { return connFacManager.getJMSConnectionFactory(props); @@ -103,58 +91,85 @@ OutTransportInfo outTransportInfo) throws AxisFault { JMSConnectionFactory jmsConnectionFactory = null; - Connection connection = null; // holds a one time connection if used - JMSOutTransportInfo jmsOut; - Session session = null; - Destination replyDestination = null; + JMSOutTransportInfo jmsOut = null; + JMSMessageSender messageSender = null; - try { - if (targetAddress != null) { + if (targetAddress != null) { - jmsOut = new JMSOutTransportInfo(targetAddress); - // do we have a definition for a connection factory to use for this address? - jmsConnectionFactory = getJMSConnectionFactory(jmsOut); + jmsOut = new JMSOutTransportInfo(targetAddress); + // do we have a definition for a connection factory to use for this address? + jmsConnectionFactory = getJMSConnectionFactory(jmsOut); + + if (jmsConnectionFactory != null) { + messageSender = new JMSMessageSender(jmsConnectionFactory, targetAddress); - if (jmsConnectionFactory != null) { - // create new or get existing session to send to the destination from the CF - session = jmsConnectionFactory.getSessionForDestination( - JMSUtils.getDestination(targetAddress)); + } else { + try { + messageSender = JMSUtils.createJMSSender(jmsOut); + } catch (JMSException e) { + handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e); + } + } - } else { - // digest the targetAddress and locate CF from the EPR - jmsOut.loadConnectionFactoryFromProperies(); - try { - // create a one time connection and session to be used - Hashtable<String,String> jndiProps = jmsOut.getProperties(); - connection = JMSUtils.createConnection(jmsOut.getConnectionFactory(), - jndiProps.get(Context.SECURITY_PRINCIPAL), - jndiProps.get(Context.SECURITY_CREDENTIALS), - jmsOut.getDestinationType()); + } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) { - session = JMSUtils.createSession(connection, false, - Session.AUTO_ACKNOWLEDGE, jmsOut.getDestinationType()); + jmsOut = (JMSOutTransportInfo) outTransportInfo; + try { + messageSender = JMSUtils.createJMSSender(jmsOut); + } catch (JMSException e) { + handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e); + } + } - } catch (JMSException e) { - handleException("Error creating a connection/session for : " + targetAddress, e); - } - } - replyDestination = jmsOut.getReplyDestination(); + // The message property to be used to send the content type is determined by + // the out transport info, i.e. either from the EPR if we are sending a request, + // or, if we are sending a response, from the configuration of the service that + // received the request). The property name can be overridden by a message + // context property. + String contentTypeProperty = + (String) msgCtx.getProperty(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); + if (contentTypeProperty == null) { + contentTypeProperty = jmsOut.getContentTypeProperty(); + } + + // need to synchronize as Sessions are not thread safe + synchronized (messageSender.getSession()) { + try { + sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut); + } finally { + messageSender.close(); + } + } + } - } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) { + /** + * Perform actual sending of the JMS message + */ + private void sendOverJMS(MessageContext msgCtx, JMSMessageSender messageSender, + String contentTypeProperty, JMSConnectionFactory jmsConnectionFactory, + JMSOutTransportInfo jmsOut) throws AxisFault { + + // convert the axis message context into a JMS Message that we can send over JMS + Message message = null; + String correlationId = null; + try { + message = createJMSMessage(msgCtx, messageSender.getSession(), contentTypeProperty); + } catch (JMSException e) { + handleException("Error creating a JMS message from the message context", e); + } - jmsOut = (JMSOutTransportInfo) outTransportInfo; - jmsConnectionFactory = jmsOut.getJmsConnectionFactory(); + // should we wait for a synchronous response on this same thread? + boolean waitForResponse = waitForSynchronousResponse(msgCtx); + Destination replyDestination = jmsOut.getReplyDestination(); - session = jmsConnectionFactory.getSessionForDestination( - jmsOut.getDestination().toString()); - } else { - handleException("Unable to get JMSOutTransportInfo"); - return; // We never get here. Just make the compiler happy. - } - - Destination destination = jmsOut.getDestination(); + // if this is a synchronous out-in, prepare to listen on the response destination + if (waitForResponse) { String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_REPLY_TO); + if (replyDestName == null && jmsConnectionFactory != null) { + replyDestName = jmsConnectionFactory.getReplyToDestination(); + } + if (replyDestName != null) { if (jmsConnectionFactory != null) { replyDestination = jmsConnectionFactory.getDestination(replyDestName); @@ -162,107 +177,45 @@ replyDestination = jmsOut.getReplyDestination(replyDestName); } } + replyDestination = JMSUtils.setReplyDestination( + replyDestination, messageSender.getSession(), message); + } - if(session == null) { - handleException("Could not create JMS session"); - } - - // now we are going to use the JMS session, but if this was a session from a - // defined JMS connection factory, we need to synchronize as sessions are not - // thread safe - synchronized(session) { - // The message property to be used to send the content type is determined by - // the out transport info, i.e. either from the EPR if we are sending a request, - // or, if we are sending a response, from the configuration of the service that - // received the request). The property name can be overridden by a message - // context property. - String contentTypeProperty = - (String)msgCtx.getProperty(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); - if (contentTypeProperty == null) { - contentTypeProperty = jmsOut.getContentTypeProperty(); - } - - // convert the axis message context into a JMS Message that we can send over JMS - Message message = null; - String correlationId = null; - try { - message = createJMSMessage(msgCtx, session, contentTypeProperty); - } catch (JMSException e) { - handleException("Error creating a JMS message from the axis message context", e); - } - - String destinationType = jmsOut.getDestinationType(); - - // if the destination does not exist, see if we can create it - destination = JMSUtils.createDestinationIfRequired( - destination, destinationType, targetAddress, session); - - if(jmsOut.getReplyDestinationName() != null) { - replyDestination = JMSUtils.createReplyDestinationIfRequired( - replyDestination, jmsOut.getReplyDestinationName(), - jmsOut.getReplyDestinationType(), targetAddress, session); - } - - // should we wait for a synchronous response on this same thread? - boolean waitForResponse = waitForSynchronousResponse(msgCtx); - - // if this is a synchronous out-in, prepare to listen on the response destination - if (waitForResponse) { - replyDestination = JMSUtils.setReplyDestination( - replyDestination, session, message); - } + try { + messageSender.send(message, msgCtx); + metrics.incrementMessagesSent(); - // send the outgoing message over JMS to the destination selected - try { - JMSUtils.sendMessageToJMSDestination(session, destination, destinationType, message); + } catch (AxisJMSException e) { + metrics.incrementFaultsSending(); + handleException("Error sending JMS message", e); + } - // set the actual MessageID to the message context for use by any others - try { - String msgId = message.getJMSMessageID(); - if (msgId != null) { - msgCtx.setProperty(JMSConstants.JMS_MESSAGE_ID, msgId); - } - } catch (JMSException ignore) {} + try { + metrics.incrementBytesSent(JMSUtils.getMessageSize(message)); + } catch (JMSException e) { + log.warn("Error reading JMS message size to update transport metrics", e); + } - metrics.incrementMessagesSent(); - try { - metrics.incrementBytesSent(JMSUtils.getMessageSize(message)); - } catch (JMSException e) { - log.warn("Error reading JMS message size to update transport metrics", e); - } - } catch (BaseTransportException e) { - metrics.incrementFaultsSending(); - throw e; - } + // if we are expecting a synchronous response back for the message sent out + if (waitForResponse) { + // TODO ******************************************************************************** + // TODO **** replace with asynchronous polling via a poller task to process this ******* + // information would be given. Then it should poll (until timeout) the + // requested destination for the response message and inject it from a + // asynchronous worker thread + try { + messageSender.getConnection().start(); // multiple calls are safely ignored + } catch (JMSException ignore) {} - // if we are expecting a synchronous response back for the message sent out - if (waitForResponse) { - if (connection != null) { - try { - connection.start(); - } catch (JMSException ignore) {} - } else { - // If connection is null, we are using a cached session and the underlying - // connection is already started. Thus, there is nothing to do here. - } - try { - correlationId = message.getJMSMessageID(); - } catch(JMSException ignore) {} - - // We assume here that the response uses the same message property to - // specify the content type of the message. - waitForResponseAndProcess(session, replyDestination, - jmsOut.getReplyDestinationType(), msgCtx, correlationId, - contentTypeProperty); - } - } + try { + correlationId = message.getJMSMessageID(); + } catch(JMSException ignore) {} - } finally { - if (connection != null) { - try { - connection.close(); - } catch (JMSException ignore) {} - } + // We assume here that the response uses the same message property to + // specify the content type of the message. + waitForResponseAndProcess(messageSender.getSession(), replyDestination, + msgCtx, correlationId, contentTypeProperty); + // TODO ******************************************************************************** } } @@ -278,17 +231,13 @@ * @throws AxisFault on error */ private void waitForResponseAndProcess(Session session, Destination replyDestination, - String replyDestinationType, MessageContext msgCtx, String correlationId, + MessageContext msgCtx, String correlationId, String contentTypeProperty) throws AxisFault { try { MessageConsumer consumer; - if (correlationId != null) { - consumer = JMSUtils.createConsumer(session, replyDestination, - "JMSCorrelationID = '" + correlationId + "'"); - } else { - consumer = JMSUtils.createConsumer(session, replyDestination); - } + consumer = JMSUtils.createConsumer(session, replyDestination, + "JMSCorrelationID = '" + correlationId + "'"); // how long are we willing to wait for the sync response long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT; @@ -332,8 +281,9 @@ } catch (JMSException e) { metrics.incrementFaultsReceiving(); - handleException("Error creating consumer or receiving reply to : " + - replyDestination, e); + handleException("Error creating a consumer, or receiving a synchronous reply " + + "for outgoing MessageContext ID : " + msgCtx.getMessageID() + + " and reply Destination : " + replyDestination, e); } }