Author: mriou
Date: Mon Aug 18 16:49:49 2008
New Revision: 686898
URL: http://svn.apache.org/viewvc?rev=686898&view=rev
Log:
ODE-345 (on trunk) Allow BPEL Processes To Be Provided Over JMS
Modified:
ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
ode/trunk/axis2/src/main/java/org/apache/ode/axis2/hooks/ODEAxisService.java
Modified: ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
URL:
http://svn.apache.org/viewvc/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java?rev=686898&r1=686897&r2=686898&view=diff
==============================================================================
--- ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java (original)
+++ ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java Mon Aug
18 16:49:49 2008
@@ -52,6 +52,7 @@
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.engine.BpelServerImpl;
import org.apache.ode.bpel.engine.CountLRUDehydrationPolicy;
+import org.apache.ode.bpel.engine.SharedEndpoints;
import org.apache.ode.bpel.evtproc.DebugBpelEventListener;
import org.apache.ode.bpel.extvar.jdbc.JdbcExternalVariableModule;
import org.apache.ode.bpel.iapi.BpelEventListener;
@@ -302,15 +303,20 @@
}
public ODEService createService(ProcessConf pconf, QName serviceName,
String portName) throws AxisFault {
- destroyService(serviceName, portName);
- AxisService axisService = ODEAxisService.createService(_axisConfig,
pconf, serviceName, portName);
+ // Since multiple processes may provide services at the same (JMS)
endpoint, qualify
+ // the (JMS) endpoint-specific NCName with a process-relative URI, if
necessary.
+ QName uniqueServiceName = new QName(
+ ODEAxisService.extractServiceName(pconf, serviceName,
portName));
+
+ destroyService(uniqueServiceName, portName);
+ AxisService axisService = ODEAxisService.createService(
+ _axisConfig, pconf, serviceName, portName,
uniqueServiceName.getLocalPart());
ODEService odeService = new ODEService(axisService, pconf,
serviceName, portName, _server);
- _services.put(serviceName, portName, odeService);
+ _services.put(uniqueServiceName, portName, odeService);
// Setting our new service on the receiver, the same receiver handles
- // all
- // operations so the first one should fit them all
+ // all operations so the first one should fit them all
AxisOperation firstOp = (AxisOperation)
axisService.getOperations().next();
((ODEMessageReceiver)
firstOp.getMessageReceiver()).setService(odeService);
Modified:
ode/trunk/axis2/src/main/java/org/apache/ode/axis2/hooks/ODEAxisService.java
URL:
http://svn.apache.org/viewvc/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/hooks/ODEAxisService.java?rev=686898&r1=686897&r2=686898&view=diff
==============================================================================
---
ode/trunk/axis2/src/main/java/org/apache/ode/axis2/hooks/ODEAxisService.java
(original)
+++
ode/trunk/axis2/src/main/java/org/apache/ode/axis2/hooks/ODEAxisService.java
Mon Aug 18 16:49:49 2008
@@ -20,8 +20,8 @@
package org.apache.ode.axis2.hooks;
import java.io.FileNotFoundException;
-import java.io.InputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
import java.util.Collection;
import java.util.Iterator;
@@ -40,9 +40,11 @@
import org.apache.axis2.deployment.ServiceBuilder;
import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.Parameter;
import org.apache.axis2.description.WSDL11ToAxisServiceBuilder;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.engine.MessageReceiver;
+import org.apache.axis2.transport.jms.JMSConstants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.axis2.OdeFault;
@@ -62,9 +64,8 @@
private static final Log LOG = LogFactory.getLog(ODEAxisService.class);
public static AxisService createService(AxisConfiguration axisConfig,
ProcessConf pconf, QName wsdlServiceName,
- String portName) throws AxisFault {
+ String portName, String
axisServiceName) throws AxisFault {
Definition wsdlDefinition =
pconf.getDefinitionForService(wsdlServiceName);
- String serviceName = extractServiceName(wsdlDefinition,
wsdlServiceName, portName);
if (LOG.isDebugEnabled()) {
LOG.debug("Create AxisService:"+" service="+wsdlServiceName+"
port="+portName
@@ -81,7 +82,7 @@
serviceBuilder.setServerSide(true);
AxisService axisService = serviceBuilder.populateService();
- axisService.setName(serviceName);
+ axisService.setName(axisServiceName);
axisService.setWsdlFound(true);
axisService.setCustomWsdl(true);
axisService.setClassLoader(axisConfig.getServiceClassLoader());
@@ -105,7 +106,7 @@
// In doc/lit we need to declare a mapping between operations and
message element names
// to be able to route properly.
- declarePartsElements(wsdlDefinition, wsdlServiceName, serviceName,
portName);
+ declarePartsElements(wsdlDefinition, wsdlServiceName,
axisServiceName, portName);
Iterator operations = axisService.getOperations();
ODEMessageReceiver msgReceiver = new ODEMessageReceiver();
@@ -115,6 +116,13 @@
operation.setMessageReceiver(msgReceiver);
}
}
+
+ // Set the JMS destination name on the Axis Service
+ if (isJmsEndpoint(pconf, wsdlServiceName, portName)) {
+ axisService.addParameter(new
Parameter(JMSConstants.DEST_PARAM,
+ extractJMSDestinationName(axisServiceName,
deriveBaseServiceUri(pconf))));
+ }
+
return axisService;
} catch (Exception e) {
throw AxisFault.makeFault(e);
@@ -138,57 +146,129 @@
return axisService;
}
- private static String extractServiceName(Definition wsdlDefinition, QName
wsdlServiceName, String portName)
- throws AxisFault {
- String url = null;
- Service service = wsdlDefinition.getService(wsdlServiceName);
- if (service == null) {
- throw new OdeFault("Unable to find service " + wsdlServiceName + "
from service WSDL definition "
- + wsdlDefinition.getDocumentBaseURI());
- }
- Port port = service.getPort(portName);
- if (port == null) {
- throw new OdeFault("Couldn't find port " + portName + " in
definition " + wsdlServiceName);
- }
- for (Object oext : port.getExtensibilityElements()) {
- if (oext instanceof SOAPAddress)
- url = ((SOAPAddress) oext).getLocationURI();
- }
- if (url == null) {
- throw new OdeFault("Could not extract any soap:address from
service WSDL definition " + wsdlServiceName
- + " (necessary to establish the process target address)!");
- }
- String serviceName = parseURLForService(url);
- if (serviceName == null) {
- throw new OdeFault("The soap:address used for service WSDL
definition " + wsdlServiceName + " and port "
- + portName + " should be of the form
http://hostname:port/ode/processes/myProcessEndpointName");
- }
- return serviceName;
- }
-
/**
- * Obtain the service name from the request URL. The request URL is
expected to use the path "/processes/" under
- * which all processes and their services are listed. Returns null if the
path does not contain this part.
+ * Extract the JMS destination name that is embedded in the Axis service
name.
+ * @param serviceName the name of the axis service
+ * @return the corresponding JMS destination name
*/
- protected static String parseURLForService(String path) {
- int index = path.indexOf("/processes/");
- if (-1 != index) {
- String service;
-
- int serviceStart = index + "/processes/".length();
- if (path.length() > serviceStart + 1) {
- service = path.substring(serviceStart);
- // Path may contain query string, not interesting for us.
- int queryIndex = service.indexOf('?');
- if (queryIndex > 0) {
- service = service.substring(0, queryIndex);
- }
- return service;
- }
- }
- return null;
- }
+ private static String extractJMSDestinationName(String serviceName, String
baseUri) {
+ String destinationPrefix = "dynamicQueues/";
+ int index = serviceName.indexOf(destinationPrefix);
+ if (index == -1) {
+ destinationPrefix = "dynamicTopics/";
+ index = serviceName.indexOf(destinationPrefix);
+ }
+ if (index == -1) {
+ destinationPrefix = baseUri + "/";
+ index = serviceName.indexOf(destinationPrefix);
+ }
+ return (index != -1) ? serviceName.substring(index) : serviceName;
+ }
+
+ private static String extractEndpointUri(ProcessConf pconf, QName
wsdlServiceName, String portName)
+ throws AxisFault {
+ Definition wsdlDefinition =
pconf.getDefinitionForService(wsdlServiceName);
+ String url = null;
+ Service service = wsdlDefinition.getService(wsdlServiceName);
+ if (service == null) {
+ throw new OdeFault("Unable to find service " +
wsdlServiceName + " from service WSDL definition "
+ + wsdlDefinition.getDocumentBaseURI());
+ }
+ Port port = service.getPort(portName);
+ if (port == null) {
+ throw new OdeFault("Couldn't find port " + portName + " in
definition " + wsdlServiceName);
+ }
+ for (Object oext : port.getExtensibilityElements()) {
+ if (oext instanceof SOAPAddress)
+ url = ((SOAPAddress) oext).getLocationURI();
+ }
+ if (url == null) {
+ throw new OdeFault("Could not extract any soap:address from
service WSDL definition " + wsdlServiceName
+ + " (necessary to establish the process target
address)!");
+ }
+ return url;
+}
+ private static boolean isJmsEndpoint(ProcessConf pconf, QName
wsdlServiceName, String portName)
+ throws AxisFault {
+ String url = extractEndpointUri(pconf, wsdlServiceName,
portName);
+ return url.startsWith("jms:");
+ }
+
+ public static String extractServiceName(ProcessConf pconf, QName
wsdlServiceName, String portName)
+ throws AxisFault {
+ String serviceName =
parseURLForService(extractEndpointUri(pconf, wsdlServiceName, portName),
deriveBaseServiceUri(pconf));
+ if (serviceName == null) {
+ throw new OdeFault("The soap:address used for service WSDL
definition " + wsdlServiceName + " and port "
+ + portName + " should be of the form
http://hostname:port/ode/processes/myProcessEndpointName");
+ }
+ return serviceName;
+ }
+
+ /**
+ * Obtain the service name from the request URL. The request URL is
expected to use the path "/processes/" under
+ * which all processes and their services are listed. Returns null if
the path does not contain this part.
+ */
+ protected static String parseURLForService(String path, String baseUri)
{
+ // Assume that path is HTTP-based, by default
+ String servicePrefix = "/processes/";
+ // Don't assume JMS-based paths start the same way
+ if (path.startsWith("jms")) {
+ servicePrefix = "jms:/";
+ }
+ int index = path.indexOf(servicePrefix);
+ if (-1 != index) {
+ String service;
+
+ int serviceStart = index + servicePrefix.length();
+ if (path.length() > serviceStart + 1) {
+ service = path.substring(serviceStart);
+ // Path may contain query string, not interesting for
us.
+ int queryIndex = service.indexOf('?');
+ if (queryIndex > 0) {
+ service = service.substring(0, queryIndex);
+ }
+ // Qualify shared JMS names with unique baseUri
+ if (path.startsWith("jms")) {
+ service = baseUri + "/" + service;
+ }
+ return service;
+ }
+ }
+ return null;
+ }
+
+ /*
+ * Generates a URI of the following form:
+ * ${deploy_bundleNcName}/${diagram_relativeURL}/${process_relativeURL}
+ * When a service name (local part only) is qualified (prefixed) with the
above,
+ * it results in a unique identifier that may be used as that service's
name.
+ */
+ public static String deriveBaseServiceUri(ProcessConf pconf) {
+ if (pconf != null) {
+ StringBuffer baseServiceUri = new StringBuffer();
+ String bundleName = pconf.getPackage();
+ if (bundleName != null) {
+ baseServiceUri.append(bundleName).append("/");
+ if (pconf.getBpelDocument() != null) {
+ String diagramName =
pconf.getBpelDocument();
+ if (diagramName.indexOf(".") > 0) {
+ diagramName =
diagramName.substring(0, diagramName.indexOf(".") - 1);
+ }
+
baseServiceUri.append(diagramName).append("/");
+ String processName = pconf.getType() !=
null
+ ?
pconf.getType().getLocalPart() : null;
+ if (processName != null) {
+
baseServiceUri.append(processName);
+ return
baseServiceUri.toString();
+ }
+ }
+ }
+
+ }
+ return null;
+ }
+
private static void declarePartsElements(Definition wsdlDefinition, QName
wsdlServiceName, String axisServiceName,
String portName) {
List wsldOps =
wsdlDefinition.getService(wsdlServiceName).getPort(portName).getBinding().getPortType()