Author: antelder
Date: Thu May 14 10:16:25 2009
New Revision: 774720
URL: http://svn.apache.org/viewvc?rev=774720&view=rev
Log:
Add support for using Tuscany threads instead on setMessageListener in a JEE
container environment
Modified:
tuscany/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceBindingProvider.java
tuscany/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceProvider.java
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java
Modified:
tuscany/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceBindingProvider.java
URL:
http://svn.apache.org/viewvc/tuscany/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceBindingProvider.java?rev=774720&r1=774719&r2=774720&view=diff
==============================================================================
---
tuscany/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceBindingProvider.java
(original)
+++
tuscany/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceBindingProvider.java
Thu May 14 10:16:25 2009
@@ -31,6 +31,7 @@
import org.apache.tuscany.sca.binding.ws.wsdlgen.BindingWSDLGenerator;
import org.apache.tuscany.sca.contribution.ModelFactoryExtensionPoint;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.UtilityExtensionPoint;
import org.apache.tuscany.sca.databinding.DataBindingExtensionPoint;
import org.apache.tuscany.sca.host.http.ServletHost;
import org.apache.tuscany.sca.host.http.ServletHostExtensionPoint;
@@ -40,6 +41,7 @@
import org.apache.tuscany.sca.provider.ServiceBindingProvider;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
+import org.apache.tuscany.sca.work.WorkScheduler;
/**
* The service binding provider for the remote sca binding implementation.
Relies on the
@@ -69,6 +71,8 @@
ModelFactoryExtensionPoint modelFactories =
extensionPoints.getExtensionPoint(ModelFactoryExtensionPoint.class);
MessageFactory messageFactory =
modelFactories.getFactory(MessageFactory.class);
DataBindingExtensionPoint dataBindings =
extensionPoints.getExtensionPoint(DataBindingExtensionPoint.class);
+ UtilityExtensionPoint utilities =
extensionPoints.getExtensionPoint(UtilityExtensionPoint.class);
+ WorkScheduler workScheduler =
utilities.getUtility(WorkScheduler.class);
this.binding = binding.getSCABinding();
wsBinding =
modelFactories.getFactory(WebServiceBindingFactory.class).createWebServiceBinding();
@@ -88,7 +92,8 @@
wsBinding,
servletHost,
messageFactory,
- policyHandlerClassnames);
+ policyHandlerClassnames,
+ workScheduler);
}
public InterfaceContract getBindingInterfaceContract() {
Modified:
tuscany/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceProvider.java
URL:
http://svn.apache.org/viewvc/tuscany/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceProvider.java?rev=774720&r1=774719&r2=774720&view=diff
==============================================================================
---
tuscany/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceProvider.java
(original)
+++
tuscany/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceProvider.java
Thu May 14 10:16:25 2009
@@ -25,11 +25,13 @@
import org.apache.tuscany.sca.assembly.SCABinding;
import org.apache.tuscany.sca.binding.ws.WebServiceBinding;
import org.apache.tuscany.sca.binding.ws.axis2.Axis2ServiceProvider;
+import org.apache.tuscany.sca.core.UtilityExtensionPoint;
import org.apache.tuscany.sca.host.http.ServletHost;
import org.apache.tuscany.sca.invocation.MessageFactory;
import org.apache.tuscany.sca.policy.util.PolicyHandlerTuple;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
+import org.apache.tuscany.sca.work.WorkScheduler;
/**
* A specialization of the Axis2BindingProvider that just switches in the
SCABinding model
@@ -58,14 +60,16 @@
WebServiceBinding wsBinding,
ServletHost servletHost,
MessageFactory messageFactory,
- List<PolicyHandlerTuple>
policyHandlerClassnames) {
+ List<PolicyHandlerTuple>
policyHandlerClassnames,
+ WorkScheduler workScheduler) {
super(component,
service,
wsBinding,
servletHost,
messageFactory,
- policyHandlerClassnames);
+ policyHandlerClassnames,
+ workScheduler);
this.binding = binding;
}
Modified:
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java
URL:
http://svn.apache.org/viewvc/tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java?rev=774720&r1=774719&r2=774720&view=diff
==============================================================================
---
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java
(original)
+++
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java
Thu May 14 10:16:25 2009
@@ -23,6 +23,7 @@
import org.apache.tuscany.sca.binding.ws.WebServiceBinding;
import org.apache.tuscany.sca.contribution.ModelFactoryExtensionPoint;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.UtilityExtensionPoint;
import org.apache.tuscany.sca.databinding.DataBindingExtensionPoint;
import org.apache.tuscany.sca.host.http.ServletHost;
import org.apache.tuscany.sca.host.http.ServletHostExtensionPoint;
@@ -34,7 +35,7 @@
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeComponentReference;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
-import org.osoa.sca.ServiceRuntimeException;
+import org.apache.tuscany.sca.work.WorkScheduler;
/**
* Axis2BindingProviderFactory
@@ -48,6 +49,7 @@
private ServletHost servletHost;
private List<PolicyHandlerTuple> policyHandlerClassnames = null;
private DataBindingExtensionPoint dataBindings;
+ private WorkScheduler workScheduler;
public Axis2BindingProviderFactory(ExtensionPointRegistry extensionPoints)
{
ServletHostExtensionPoint servletHosts =
extensionPoints.getExtensionPoint(ServletHostExtensionPoint.class);
@@ -58,6 +60,8 @@
modelFactories =
extensionPoints.getExtensionPoint(ModelFactoryExtensionPoint.class);
policyHandlerClassnames =
PolicyHandlerDefinitionsLoader.loadPolicyHandlerClassnames();
dataBindings =
extensionPoints.getExtensionPoint(DataBindingExtensionPoint.class);
+ UtilityExtensionPoint utilities =
extensionPoints.getExtensionPoint(UtilityExtensionPoint.class);
+ workScheduler = utilities.getUtility(WorkScheduler.class);
}
public ReferenceBindingProvider
createReferenceBindingProvider(RuntimeComponent component,
@@ -72,7 +76,7 @@
WebServiceBinding binding) {
return new Axis2ServiceBindingProvider(component, service, binding,
servletHost, modelFactories,
- policyHandlerClassnames,
dataBindings);
+ policyHandlerClassnames,
dataBindings, workScheduler);
}
public Class<WebServiceBinding> getModelType() {
Modified:
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java
URL:
http://svn.apache.org/viewvc/tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java?rev=774720&r1=774719&r2=774720&view=diff
==============================================================================
---
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java
(original)
+++
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java
Thu May 14 10:16:25 2009
@@ -31,6 +31,7 @@
import org.apache.tuscany.sca.provider.ServiceBindingProvider;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
+import org.apache.tuscany.sca.work.WorkScheduler;
import org.osoa.sca.ServiceRuntimeException;
public class Axis2ServiceBindingProvider implements ServiceBindingProvider {
@@ -44,7 +45,8 @@
ServletHost servletHost,
ModelFactoryExtensionPoint
modelFactories,
List<PolicyHandlerTuple>
policyHandlerClassnames,
- DataBindingExtensionPoint dataBindings)
{
+ DataBindingExtensionPoint dataBindings,
+ WorkScheduler workScheduler) {
if (servletHost == null) {
throw new ServiceRuntimeException("No Servlet host is avaible for
HTTP web services");
@@ -62,7 +64,7 @@
InterfaceContract contract = wsBinding.getBindingInterfaceContract();
contract.getInterface().resetDataBinding(OMElement.class.getName());
- axisProvider = new Axis2ServiceProvider(component, service, wsBinding,
servletHost, messageFactory, policyHandlerClassnames);
+ axisProvider = new Axis2ServiceProvider(component, service, wsBinding,
servletHost, messageFactory, policyHandlerClassnames, workScheduler);
}
public void start() {
Modified:
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java
URL:
http://svn.apache.org/viewvc/tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java?rev=774720&r1=774719&r2=774720&view=diff
==============================================================================
---
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java
(original)
+++
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java
Thu May 14 10:16:25 2009
@@ -100,6 +100,7 @@
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
import org.apache.tuscany.sca.runtime.RuntimeWire;
+import org.apache.tuscany.sca.work.WorkScheduler;
import org.apache.tuscany.sca.xsd.XSDefinition;
import org.apache.ws.commons.schema.XmlSchema;
import org.apache.ws.commons.schema.XmlSchemaExternal;
@@ -134,6 +135,7 @@
private BasicAuthenticationPolicy basicAuthenticationPolicy = null;
private Axis2TokenAuthenticationPolicy axis2TokenAuthenticationPolicy =
null;
private List<Axis2HeaderPolicy> axis2HeaderPolicies = new
ArrayList<Axis2HeaderPolicy>();
+ private WorkScheduler workScheduler;
public static final QName QNAME_WSA_ADDRESS =
new QName(AddressingConstants.Final.WSA_NAMESPACE,
AddressingConstants.EPR_ADDRESS);
@@ -166,7 +168,8 @@
WebServiceBinding wsBinding,
ServletHost servletHost,
MessageFactory messageFactory,
- List<PolicyHandlerTuple>
policyHandlerClassnames) {
+ List<PolicyHandlerTuple>
policyHandlerClassnames,
+ WorkScheduler workScheduler) {
this.component = component;
this.contract = contract;
@@ -174,6 +177,7 @@
this.servletHost = servletHost;
this.messageFactory = messageFactory;
this.policyHandlerClassnames = policyHandlerClassnames;
+ this.workScheduler = workScheduler;
final boolean isRampartRequired =
AxisPolicyHelper.isRampartRequired(wsBinding);
try {
@@ -324,7 +328,7 @@
} else if (endpointURL.startsWith("jms")) {
logger.log(Level.INFO,"Axis2 JMS URL=" + endpointURL);
- jmsListener = new JMSListener();
+ jmsListener = new JMSListener(workScheduler);
jmsSender = new JMSSender();
ListenerManager listenerManager =
configContext.getListenerManager();
TransportInDescription trsIn =
configContext.getAxisConfiguration().getTransportIn(Constants.TRANSPORT_JMS);
Modified:
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java
URL:
http://svn.apache.org/viewvc/tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java?rev=774720&r1=774719&r2=774720&view=diff
==============================================================================
---
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java
(original)
+++
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java
Thu May 14 10:16:25 2009
@@ -28,6 +28,7 @@
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
@@ -42,6 +43,7 @@
import org.apache.axis2.transport.jms.JMSUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tuscany.sca.work.WorkScheduler;
/**
* Encapsulate a JMS Connection factory definition within an Axis2.xml
@@ -147,19 +149,23 @@
*/
private String pass = null;
+ private WorkScheduler workScheduler;
+ private boolean consumerRunning;
+
/**
* Create a JMSConnectionFactory for the given Axis2 name and JNDI name
*
* @param name the local Axis2 name of the connection factory
* @param jndiName the JNDI name of the actual connection factory used
*/
- JMSConnectionFactory(String name, String jndiName) {
+ JMSConnectionFactory(String name, String jndiName, WorkScheduler
workScheduler) {
this.name = name;
this.jndiName = jndiName;
serviceJNDINameMapping = new HashMap();
serviceDestinationMapping = new HashMap();
properties = new Hashtable();
jmsSessions = new HashMap();
+ this.workScheduler = workScheduler;
}
/**
@@ -167,8 +173,8 @@
*
* @param name the local Axis2 name of the connection factory
*/
- JMSConnectionFactory(String name) {
- this(name, null);
+ JMSConnectionFactory(String name, WorkScheduler workScheduler) {
+ this(name, null, workScheduler);
}
/**
@@ -425,7 +431,9 @@
}
// start the connection
- connection.start();
+ if (!consumerRunning) {
+ connection.start();
+ }
log.info("Connection factory : " + name + " initialized...");
}
@@ -458,10 +466,50 @@
}
MessageConsumer consumer = session.createConsumer(destination);
- consumer.setMessageListener(this.msgRcvr);
+// consumer.setMessageListener(this.msgRcvr); replace with new Tuscany
method:
+ registerMessageReceiver(consumer, this.msgRcvr);
jmsSessions.put(destinationJndi, session);
}
+ private void registerMessageReceiver(final MessageConsumer consumer, final
JMSMessageReceiver messageReceiver) throws JMSException {
+
+ try {
+
+ consumer.setMessageListener(messageReceiver);
+
+ } catch (javax.jms.JMSException e) {
+
+ // setMessageListener not allowed in JEE container so use Tuscany
threads
+
+ connection.start();
+ consumerRunning = true;
+
+ workScheduler.scheduleWork(new Runnable() {
+
+ public void run() {
+ try {
+ while (consumerRunning) {
+ final Message msg = consumer.receive();
+ if (msg != null) {
+ workScheduler.scheduleWork(new Runnable() {
+ public void run() {
+ try {
+ messageReceiver.onMessage(msg);
+ } catch (Exception e) {
+ log.error("Exception on message
receiver thread", e);
+ }
+ }
+ });
+ }
+ }
+ } catch (Exception e) {
+ log.error("Exception on consumer receive thread", e);
+ }
+ }
+ });
+ }
+ }
+
/**
* Stop listening on the given destination - for undeployment of services
*
@@ -488,6 +536,7 @@
*/
public void stop() {
try {
+ consumerRunning = false;
connection.close();
} catch (JMSException e) {
log.warn("Error shutting down connection factory : " + name, e);
Modified:
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java
URL:
http://svn.apache.org/viewvc/tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java?rev=774720&r1=774719&r2=774720&view=diff
==============================================================================
---
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java
(original)
+++
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java
Thu May 14 10:16:25 2009
@@ -49,6 +49,7 @@
import org.apache.axis2.transport.jms.JMSUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tuscany.sca.work.WorkScheduler;
import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
@@ -109,6 +110,12 @@
private ExecutorService workerPool;
+ private WorkScheduler workScheduler;
+
+ public JMSListener(WorkScheduler workScheduler) {
+ this.workScheduler = workScheduler;
+ }
+
/**
* This is the TransportListener initialization method invoked by Axis2
*
@@ -221,7 +228,7 @@
Parameter param = (Parameter) conFacIter.next();
JMSConnectionFactory jmsConFactory =
- new JMSConnectionFactory(param.getName());
+ new JMSConnectionFactory(param.getName(), workScheduler);
ParameterIncludeImpl pi = new ParameterIncludeImpl();
try {