Author: antelder
Date: Thu May 14 10:16:25 2009
New Revision: 774720

URL: http://svn.apache.org/viewvc?rev=774720&view=rev
Log:
Add support for using Tuscany threads instead on setMessageListener in a JEE 
container environment

Modified:
    
tuscany/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceBindingProvider.java
    
tuscany/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceProvider.java
    
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java
    
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java
    
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java
    
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java
    
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java

Modified: 
tuscany/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceBindingProvider.java
URL: 
http://svn.apache.org/viewvc/tuscany/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceBindingProvider.java?rev=774720&r1=774719&r2=774720&view=diff
==============================================================================
--- 
tuscany/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceBindingProvider.java
 (original)
+++ 
tuscany/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceBindingProvider.java
 Thu May 14 10:16:25 2009
@@ -31,6 +31,7 @@
 import org.apache.tuscany.sca.binding.ws.wsdlgen.BindingWSDLGenerator;
 import org.apache.tuscany.sca.contribution.ModelFactoryExtensionPoint;
 import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.UtilityExtensionPoint;
 import org.apache.tuscany.sca.databinding.DataBindingExtensionPoint;
 import org.apache.tuscany.sca.host.http.ServletHost;
 import org.apache.tuscany.sca.host.http.ServletHostExtensionPoint;
@@ -40,6 +41,7 @@
 import org.apache.tuscany.sca.provider.ServiceBindingProvider;
 import org.apache.tuscany.sca.runtime.RuntimeComponent;
 import org.apache.tuscany.sca.runtime.RuntimeComponentService;
+import org.apache.tuscany.sca.work.WorkScheduler;
 
 /**
  * The service binding provider for the remote sca binding implementation. 
Relies on the 
@@ -69,6 +71,8 @@
         ModelFactoryExtensionPoint modelFactories = 
extensionPoints.getExtensionPoint(ModelFactoryExtensionPoint.class);
         MessageFactory messageFactory = 
modelFactories.getFactory(MessageFactory.class); 
         DataBindingExtensionPoint dataBindings = 
extensionPoints.getExtensionPoint(DataBindingExtensionPoint.class);
+        UtilityExtensionPoint utilities = 
extensionPoints.getExtensionPoint(UtilityExtensionPoint.class);
+        WorkScheduler workScheduler = 
utilities.getUtility(WorkScheduler.class);
 
         this.binding = binding.getSCABinding();
         wsBinding = 
modelFactories.getFactory(WebServiceBindingFactory.class).createWebServiceBinding();
@@ -88,7 +92,8 @@
                                                    wsBinding,
                                                    servletHost,
                                                    messageFactory,
-                                                   policyHandlerClassnames);
+                                                   policyHandlerClassnames,
+                                                   workScheduler);
     }
 
     public InterfaceContract getBindingInterfaceContract() {

Modified: 
tuscany/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceProvider.java
URL: 
http://svn.apache.org/viewvc/tuscany/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceProvider.java?rev=774720&r1=774719&r2=774720&view=diff
==============================================================================
--- 
tuscany/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceProvider.java
 (original)
+++ 
tuscany/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceProvider.java
 Thu May 14 10:16:25 2009
@@ -25,11 +25,13 @@
 import org.apache.tuscany.sca.assembly.SCABinding;
 import org.apache.tuscany.sca.binding.ws.WebServiceBinding;
 import org.apache.tuscany.sca.binding.ws.axis2.Axis2ServiceProvider;
+import org.apache.tuscany.sca.core.UtilityExtensionPoint;
 import org.apache.tuscany.sca.host.http.ServletHost;
 import org.apache.tuscany.sca.invocation.MessageFactory;
 import org.apache.tuscany.sca.policy.util.PolicyHandlerTuple;
 import org.apache.tuscany.sca.runtime.RuntimeComponent;
 import org.apache.tuscany.sca.runtime.RuntimeComponentService;
+import org.apache.tuscany.sca.work.WorkScheduler;
 
 /**
  * A specialization of the Axis2BindingProvider that just switches in the 
SCABinding model
@@ -58,14 +60,16 @@
                                    WebServiceBinding wsBinding,
                                    ServletHost servletHost,
                                    MessageFactory messageFactory,
-                                   List<PolicyHandlerTuple> 
policyHandlerClassnames)  {
+                                   List<PolicyHandlerTuple> 
policyHandlerClassnames,
+                                   WorkScheduler workScheduler)  {
         
         super(component, 
                 service, 
                 wsBinding, 
                 servletHost,
                 messageFactory,
-                policyHandlerClassnames);
+                policyHandlerClassnames,
+                workScheduler);
 
         this.binding = binding;
     }

Modified: 
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java
URL: 
http://svn.apache.org/viewvc/tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java?rev=774720&r1=774719&r2=774720&view=diff
==============================================================================
--- 
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java
 (original)
+++ 
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java
 Thu May 14 10:16:25 2009
@@ -23,6 +23,7 @@
 import org.apache.tuscany.sca.binding.ws.WebServiceBinding;
 import org.apache.tuscany.sca.contribution.ModelFactoryExtensionPoint;
 import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.UtilityExtensionPoint;
 import org.apache.tuscany.sca.databinding.DataBindingExtensionPoint;
 import org.apache.tuscany.sca.host.http.ServletHost;
 import org.apache.tuscany.sca.host.http.ServletHostExtensionPoint;
@@ -34,7 +35,7 @@
 import org.apache.tuscany.sca.runtime.RuntimeComponent;
 import org.apache.tuscany.sca.runtime.RuntimeComponentReference;
 import org.apache.tuscany.sca.runtime.RuntimeComponentService;
-import org.osoa.sca.ServiceRuntimeException;
+import org.apache.tuscany.sca.work.WorkScheduler;
 
 /**
  * Axis2BindingProviderFactory
@@ -48,6 +49,7 @@
     private ServletHost servletHost;
     private List<PolicyHandlerTuple> policyHandlerClassnames = null;
     private DataBindingExtensionPoint dataBindings;
+    private WorkScheduler workScheduler;
 
     public Axis2BindingProviderFactory(ExtensionPointRegistry extensionPoints) 
{
         ServletHostExtensionPoint servletHosts = 
extensionPoints.getExtensionPoint(ServletHostExtensionPoint.class);
@@ -58,6 +60,8 @@
         modelFactories = 
extensionPoints.getExtensionPoint(ModelFactoryExtensionPoint.class);
         policyHandlerClassnames = 
PolicyHandlerDefinitionsLoader.loadPolicyHandlerClassnames();
         dataBindings = 
extensionPoints.getExtensionPoint(DataBindingExtensionPoint.class);
+        UtilityExtensionPoint utilities = 
extensionPoints.getExtensionPoint(UtilityExtensionPoint.class);
+        workScheduler = utilities.getUtility(WorkScheduler.class);
     }
 
     public ReferenceBindingProvider 
createReferenceBindingProvider(RuntimeComponent component,
@@ -72,7 +76,7 @@
                                                                
WebServiceBinding binding) {
         return new Axis2ServiceBindingProvider(component, service, binding,
                                                servletHost, modelFactories,
-                                               policyHandlerClassnames, 
dataBindings);
+                                               policyHandlerClassnames, 
dataBindings, workScheduler);
     }
     
     public Class<WebServiceBinding> getModelType() {

Modified: 
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java
URL: 
http://svn.apache.org/viewvc/tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java?rev=774720&r1=774719&r2=774720&view=diff
==============================================================================
--- 
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java
 (original)
+++ 
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java
 Thu May 14 10:16:25 2009
@@ -31,6 +31,7 @@
 import org.apache.tuscany.sca.provider.ServiceBindingProvider;
 import org.apache.tuscany.sca.runtime.RuntimeComponent;
 import org.apache.tuscany.sca.runtime.RuntimeComponentService;
+import org.apache.tuscany.sca.work.WorkScheduler;
 import org.osoa.sca.ServiceRuntimeException;
 
 public class Axis2ServiceBindingProvider implements ServiceBindingProvider {
@@ -44,7 +45,8 @@
                                        ServletHost servletHost,
                                        ModelFactoryExtensionPoint 
modelFactories,
                                        List<PolicyHandlerTuple> 
policyHandlerClassnames,
-                                       DataBindingExtensionPoint dataBindings) 
{
+                                       DataBindingExtensionPoint dataBindings,
+                                       WorkScheduler workScheduler) {
 
         if (servletHost == null) {
             throw new ServiceRuntimeException("No Servlet host is avaible for 
HTTP web services");
@@ -62,7 +64,7 @@
         InterfaceContract contract = wsBinding.getBindingInterfaceContract();
         contract.getInterface().resetDataBinding(OMElement.class.getName());
 
-        axisProvider = new Axis2ServiceProvider(component, service, wsBinding, 
servletHost, messageFactory, policyHandlerClassnames);
+        axisProvider = new Axis2ServiceProvider(component, service, wsBinding, 
servletHost, messageFactory, policyHandlerClassnames, workScheduler);
     }
 
     public void start() {

Modified: 
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java
URL: 
http://svn.apache.org/viewvc/tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java?rev=774720&r1=774719&r2=774720&view=diff
==============================================================================
--- 
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java
 (original)
+++ 
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java
 Thu May 14 10:16:25 2009
@@ -100,6 +100,7 @@
 import org.apache.tuscany.sca.runtime.RuntimeComponent;
 import org.apache.tuscany.sca.runtime.RuntimeComponentService;
 import org.apache.tuscany.sca.runtime.RuntimeWire;
+import org.apache.tuscany.sca.work.WorkScheduler;
 import org.apache.tuscany.sca.xsd.XSDefinition;
 import org.apache.ws.commons.schema.XmlSchema;
 import org.apache.ws.commons.schema.XmlSchemaExternal;
@@ -134,6 +135,7 @@
     private BasicAuthenticationPolicy basicAuthenticationPolicy = null;
     private Axis2TokenAuthenticationPolicy axis2TokenAuthenticationPolicy = 
null;
     private List<Axis2HeaderPolicy> axis2HeaderPolicies = new 
ArrayList<Axis2HeaderPolicy>();
+    private WorkScheduler workScheduler;
 
     public static final QName QNAME_WSA_ADDRESS =
         new QName(AddressingConstants.Final.WSA_NAMESPACE, 
AddressingConstants.EPR_ADDRESS);
@@ -166,7 +168,8 @@
                                 WebServiceBinding wsBinding,
                                 ServletHost servletHost,
                                 MessageFactory messageFactory,
-                                List<PolicyHandlerTuple> 
policyHandlerClassnames) {
+                                List<PolicyHandlerTuple> 
policyHandlerClassnames,
+                                WorkScheduler workScheduler) {
 
         this.component = component; 
         this.contract = contract; 
@@ -174,6 +177,7 @@
         this.servletHost = servletHost;
         this.messageFactory = messageFactory;
         this.policyHandlerClassnames = policyHandlerClassnames;
+        this.workScheduler = workScheduler;
 
         final boolean isRampartRequired = 
AxisPolicyHelper.isRampartRequired(wsBinding);
         try {
@@ -324,7 +328,7 @@
                 } else if (endpointURL.startsWith("jms")) {
                     logger.log(Level.INFO,"Axis2 JMS URL=" + endpointURL);
                     
-                    jmsListener = new JMSListener();
+                    jmsListener = new JMSListener(workScheduler);
                     jmsSender = new JMSSender();
                     ListenerManager listenerManager = 
configContext.getListenerManager();
                     TransportInDescription trsIn = 
configContext.getAxisConfiguration().getTransportIn(Constants.TRANSPORT_JMS);

Modified: 
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java
URL: 
http://svn.apache.org/viewvc/tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java?rev=774720&r1=774719&r2=774720&view=diff
==============================================================================
--- 
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java
 (original)
+++ 
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java
 Thu May 14 10:16:25 2009
@@ -28,6 +28,7 @@
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.Queue;
 import javax.jms.Session;
@@ -42,6 +43,7 @@
 import org.apache.axis2.transport.jms.JMSUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tuscany.sca.work.WorkScheduler;
 
 /**
  * Encapsulate a JMS Connection factory definition within an Axis2.xml
@@ -147,19 +149,23 @@
      */
     private String pass = null;
     
+    private WorkScheduler workScheduler;
+    private boolean consumerRunning;
+
     /**
      * Create a JMSConnectionFactory for the given Axis2 name and JNDI name
      *
      * @param name     the local Axis2 name of the connection factory
      * @param jndiName the JNDI name of the actual connection factory used
      */
-    JMSConnectionFactory(String name, String jndiName) {
+    JMSConnectionFactory(String name, String jndiName, WorkScheduler 
workScheduler) {
         this.name = name;
         this.jndiName = jndiName;
         serviceJNDINameMapping = new HashMap();
         serviceDestinationMapping = new HashMap();
         properties = new Hashtable();
         jmsSessions = new HashMap();
+        this.workScheduler = workScheduler;
     }
 
     /**
@@ -167,8 +173,8 @@
      *
      * @param name the local Axis2 name of the connection factory
      */
-    JMSConnectionFactory(String name) {
-        this(name, null);
+    JMSConnectionFactory(String name, WorkScheduler workScheduler) {
+        this(name, null, workScheduler);
     }
 
     /**
@@ -425,7 +431,9 @@
         }
 
         // start the connection
-        connection.start();
+        if (!consumerRunning) {
+            connection.start();
+        }
         log.info("Connection factory : " + name + " initialized...");
     }
 
@@ -458,10 +466,50 @@
         }
 
         MessageConsumer consumer = session.createConsumer(destination);
-        consumer.setMessageListener(this.msgRcvr);
+//        consumer.setMessageListener(this.msgRcvr); replace with new Tuscany 
method:
+        registerMessageReceiver(consumer, this.msgRcvr);
         jmsSessions.put(destinationJndi, session);
     }
 
+    private void registerMessageReceiver(final MessageConsumer consumer, final 
JMSMessageReceiver messageReceiver) throws JMSException {
+
+        try {
+
+            consumer.setMessageListener(messageReceiver);
+
+        } catch (javax.jms.JMSException e) {
+
+            // setMessageListener not allowed in JEE container so use Tuscany 
threads
+
+            connection.start();
+            consumerRunning = true;
+            
+            workScheduler.scheduleWork(new Runnable() {
+
+                public void run() {
+                    try {
+                        while (consumerRunning) {
+                            final Message msg = consumer.receive();
+                            if (msg != null) {
+                                workScheduler.scheduleWork(new Runnable() {
+                                    public void run() {
+                                        try {
+                                            messageReceiver.onMessage(msg);
+                                        } catch (Exception e) {
+                                            log.error("Exception on message 
receiver thread", e);
+                                        }
+                                    }
+                                });
+                            }
+                        }
+                    } catch (Exception e) {
+                        log.error("Exception on consumer receive thread", e);
+                    }
+                }
+            });
+        }
+    }
+
     /**
      * Stop listening on the given destination - for undeployment of services
      *
@@ -488,6 +536,7 @@
      */
     public void stop() {
         try {
+            consumerRunning = false;
             connection.close();
         } catch (JMSException e) {
             log.warn("Error shutting down connection factory : " + name, e);

Modified: 
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java
URL: 
http://svn.apache.org/viewvc/tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java?rev=774720&r1=774719&r2=774720&view=diff
==============================================================================
--- 
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java
 (original)
+++ 
tuscany/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java
 Thu May 14 10:16:25 2009
@@ -49,6 +49,7 @@
 import org.apache.axis2.transport.jms.JMSUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tuscany.sca.work.WorkScheduler;
 
 import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
 import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
@@ -109,6 +110,12 @@
     
     private ExecutorService workerPool;
 
+    private WorkScheduler workScheduler;
+
+    public JMSListener(WorkScheduler workScheduler) {
+        this.workScheduler = workScheduler;
+    }
+    
     /**
      * This is the TransportListener initialization method invoked by Axis2
      *
@@ -221,7 +228,7 @@
 
             Parameter param = (Parameter) conFacIter.next();
             JMSConnectionFactory jmsConFactory =
-                    new JMSConnectionFactory(param.getName());
+                    new JMSConnectionFactory(param.getName(), workScheduler);
 
             ParameterIncludeImpl pi = new ParameterIncludeImpl();
             try {


Reply via email to