Author: veithen
Date: Sat Aug 23 13:43:57 2008
New Revision: 688410
URL: http://svn.apache.org/viewvc?rev=688410&view=rev
Log:
SYNAPSE-369: Eliminated JMSConnectionFactory#serviceDestinationNameMapping and
modified the JMS transport listener to use one JMSMessageReceiver instance per
Axis service.
Modified:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConnectionFactory.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSMessageReceiver.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSSender.java
Modified:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConnectionFactory.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConnectionFactory.java?rev=688410&r1=688409&r2=688410&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConnectionFactory.java
(original)
+++
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSConnectionFactory.java
Sat Aug 23 13:43:57 2008
@@ -20,6 +20,7 @@
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.synapse.transport.base.BaseUtils;
+import org.apache.synapse.transport.base.threads.WorkerPool;
import javax.jms.*;
import javax.naming.Context;
@@ -77,14 +78,16 @@
/** The name used for the connection factory definition within Axis2 */
private String name = null;
+ /** The JMS transport listener instance. */
+ private final JMSListener jmsListener;
+ /** The worker pool to use. */
+ private final WorkerPool workerPool;
/** The JNDI name of the actual connection factory */
private String connFactoryJNDIName = null;
/** Map of destination JNDI names to service names */
private Map<String,String> serviceJNDINameMapping = null;
/** Map of destination JNDI names to destination types*/
private Map<String,String> destinationTypeMapping = null;
- /** Map of JMS destination names to service names */
- private Map<String,String> serviceDestinationNameMapping = null;
/** JMS Sessions currently active. One session for each Destination /
Service */
private Map<String,Session> jmsSessions = null;
/** Properties of the connection factory to acquire the initial context */
@@ -97,8 +100,6 @@
private String connectionFactoryType = null;
/** The JMS Connection opened */
private Connection connection = null;
- /** The JMS Message receiver for this connection factory */
- private JMSMessageReceiver jmsMessageReceiver = null;
/** The axis2 configuration context */
private ConfigurationContext cfgCtx = null;
/** if connection dropped, reconnect timeout in milliseconds; default 30
seconds */
@@ -110,14 +111,19 @@
*
* @param name the connection factory name specified in the axis2.xml for
the
* TransportListener or the TransportSender using this
+ * @param jmsListener the JMS transport listener, or null if the
connection factory
+ * is not linked to a transport listener
+ * @param workerPool the worker pool to be used to process incoming
messages; may be null
* @param cfgCtx the axis2 configuration context
*/
- public JMSConnectionFactory(String name, ConfigurationContext cfgCtx) {
+ public JMSConnectionFactory(String name, JMSListener jmsListener,
WorkerPool workerPool,
+ ConfigurationContext cfgCtx) {
this.name = name;
+ this.jmsListener = jmsListener;
+ this.workerPool = workerPool;
this.cfgCtx = cfgCtx;
serviceJNDINameMapping = new HashMap<String,String>();
destinationTypeMapping = new HashMap<String,String>();
- serviceDestinationNameMapping = new HashMap<String,String>();
jndiProperties = new Hashtable<String,String>();
jmsSessions = new HashMap<String,Session>();
}
@@ -155,7 +161,6 @@
serviceJNDINameMapping.put(destinationJNDIName, serviceName);
destinationTypeMapping.put(destinationJNDIName, destinationType);
- serviceDestinationNameMapping.put(destinationName, serviceName);
log.info("Mapped JNDI name : " + destinationJNDIName + " and JMS
Destination name : " +
destinationName + " against service : " + serviceName);
@@ -167,15 +172,8 @@
* @param jndiDestinationName the JNDI name of the JMS destination to be
removed
*/
public void removeDestination(String jndiDestinationName) {
-
- // find and save provider specific Destination name before we delete
- String providerSpecificDestination =
getPhysicalDestinationName(jndiDestinationName);
stoplisteningOnDestination(jndiDestinationName);
-
serviceJNDINameMapping.remove(jndiDestinationName);
- if (providerSpecificDestination != null) {
- serviceDestinationNameMapping.remove(providerSpecificDestination);
- }
}
/**
@@ -250,9 +248,11 @@
handleException("Error connecting to Connection Factory : " +
connFactoryJNDIName, e);
}
- for (String destJNDIName : serviceJNDINameMapping.keySet()) {
+ for (Map.Entry<String,String> entry :
serviceJNDINameMapping.entrySet()) {
+ String destJNDIName = entry.getKey();
+ String serviceName = entry.getValue();
String destinationType = destinationTypeMapping.get(destJNDIName);
- startListeningOnDestination(destJNDIName, destinationType);
+ startListeningOnDestination(destJNDIName, destinationType,
serviceName);
}
connection.start(); // indicate readiness to start receiving messages
@@ -296,7 +296,9 @@
*
* @param destinationJNDIname the JMS destination to listen on
*/
- public void startListeningOnDestination(String destinationJNDIname, String
destinationType) {
+ public void startListeningOnDestination(String destinationJNDIname,
+ String destinationType,
+ String serviceName) {
Session session = jmsSessions.get(destinationJNDIname);
// if we already had a session open, close it first
@@ -319,7 +321,8 @@
}
MessageConsumer consumer = JMSUtils.createConsumer(session,
destination);
- consumer.setMessageListener(jmsMessageReceiver);
+ consumer.setMessageListener(new JMSMessageReceiver(jmsListener,
this, workerPool,
+ cfgCtx, serviceName));
jmsSessions.put(destinationJNDIname, session);
// catches NameNotFound and JMSExceptions and marks service as faulty
@@ -491,38 +494,6 @@
}
// -------------------- getters and setters and trivial methods
--------------------
- /**
- * Return the service name using the JMS destination given by the JNDI name
- *
- * @param jmsDestinationName the JMS destination name
- * @return the name of the service using the destination
- */
- public String getServiceNameForDestinationName(String jmsDestinationName) {
- return serviceDestinationNameMapping.get(jmsDestinationName);
- }
-
- /**
- * Return the service name using the JMS destination and its JNDI name
- *
- * @param dest the JMS Destination Queue or Topic
- * @param jmsDestinationName the JMS destination name
- * @return the name of the service using the destination
- */
- public String getServiceNameForDestination(Destination dest, String
jmsDestinationName) {
- String serviceName =
serviceDestinationNameMapping.get(jmsDestinationName);
-
- // hack to get around the crazy Active MQ dynamic queue and topic
issues
- if (serviceName == null) {
- String provider =
getJndiProperties().get(Context.INITIAL_CONTEXT_FACTORY);
- if (provider.indexOf("activemq") != -1) {
- serviceName = getServiceNameForJNDIName(
- (dest instanceof Queue ?
- JMSConstants.ACTIVEMQ_DYNAMIC_QUEUE :
- JMSConstants.ACTIVEMQ_DYNAMIC_TOPIC) +
jmsDestinationName);
- }
- }
- return serviceName;
- }
/**
* Return the service name using the JMS destination given by the JNDI name
@@ -565,18 +536,10 @@
return jndiProperties;
}
- public JMSMessageReceiver getJmsMessageReceiver() {
- return jmsMessageReceiver;
- }
-
public Context getContext() {
return context;
}
- public void setJmsMessageReceiver(JMSMessageReceiver jmsMessageReceiver) {
- this.jmsMessageReceiver = jmsMessageReceiver;
- }
-
private void handleException(String msg, Exception e) throws
AxisJMSException {
log.error(msg, e);
throw new AxisJMSException(msg, e);
Modified:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java?rev=688410&r1=688409&r2=688410&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java
(original)
+++
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java
Sat Aug 23 13:43:57 2008
@@ -94,9 +94,6 @@
public void start() throws AxisFault {
for (JMSConnectionFactory conFac : connectionFactories.values()) {
- conFac.setJmsMessageReceiver(
- new JMSMessageReceiver(this, conFac, workerPool, cfgCtx));
-
try {
conFac.connectAndListen();
} catch (JMSException e) {
@@ -167,7 +164,7 @@
log.info("Starting to listen on destination : " + destinationName + "
of type "
+ destinationType + " for service " + service.getName());
cf.addDestination(destinationName, destinationType, service.getName());
- cf.startListeningOnDestination(destinationName, destinationType);
+ cf.startListeningOnDestination(destinationName, destinationType,
service.getName());
}
/**
@@ -229,7 +226,7 @@
Parameter conFacParams = (Parameter) conFacIter.next();
JMSConnectionFactory jmsConFactory =
- new JMSConnectionFactory(conFacParams.getName(), cfgCtx);
+ new JMSConnectionFactory(conFacParams.getName(), this,
workerPool, cfgCtx);
JMSUtils.setConnectionFactoryParameters(conFacParams,
jmsConFactory);
connectionFactories.put(jmsConFactory.getName(), jmsConFactory);
Modified:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSMessageReceiver.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSMessageReceiver.java?rev=688410&r1=688409&r2=688410&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSMessageReceiver.java
(original)
+++
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSMessageReceiver.java
Sat Aug 23 13:43:57 2008
@@ -50,6 +50,8 @@
private ConfigurationContext cfgCtx = null;
/** A reference to the JMS Connection Factory to which this applies */
private JMSConnectionFactory jmsConnectionFactory = null;
+ /** The name of the service this message receiver is bound to. */
+ final String serviceName;
/** Metrics collector */
private MetricsCollector metrics = null;
@@ -60,13 +62,15 @@
* @param jmsConFac the JMS connection factory we are associated with
* @param workerPool the worker thread pool to be used
* @param cfgCtx the axis ConfigurationContext
+ * @param serviceName the name of the Axis service
*/
JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac,
- WorkerPool workerPool, ConfigurationContext cfgCtx) {
+ WorkerPool workerPool, ConfigurationContext cfgCtx,
String serviceName) {
this.jmsListener = jmsListener;
this.jmsConnectionFactory = jmsConFac;
this.workerPool = workerPool;
this.cfgCtx = cfgCtx;
+ this.serviceName = serviceName;
this.metrics = jmsListener.getMetricsCollector();
}
@@ -156,16 +160,6 @@
} catch (JMSException ignore) {}
try {
- Destination dest = message.getJMSDestination();
- String destinationName = null;
- if (dest instanceof Queue) {
- destinationName = ((Queue) dest).getQueueName();
- } else if (dest instanceof Topic) {
- destinationName = ((Topic) dest).getTopicName();
- }
-
- String serviceName =
- jmsConnectionFactory.getServiceNameForDestination(dest,
destinationName);
String soapAction = JMSUtils.getInstace().
getProperty(message, BaseConstants.SOAPACTION);
AxisService service = null;
Modified:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSSender.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSSender.java?rev=688410&r1=688409&r2=688410&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSSender.java
(original)
+++
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSSender.java
Sat Aug 23 13:43:57 2008
@@ -556,7 +556,7 @@
Parameter conFacParams = (Parameter) conFacIter.next();
JMSConnectionFactory jmsConFactory =
- new JMSConnectionFactory(conFacParams.getName(), cfgCtx);
+ new JMSConnectionFactory(conFacParams.getName(), null, null,
cfgCtx);
JMSUtils.setConnectionFactoryParameters(conFacParams,
jmsConFactory);
try {