Author: gnodet
Date: Tue Jul 29 07:32:55 2008
New Revision: 680712
URL: http://svn.apache.org/viewvc?rev=680712&view=rev
Log:
SM-1484, SM-1387
* endpoint should inherit ProviderEndpoint
* When an endpoint is stopped, the thread pool for all endpoints is stopped
* Use servicemix thread pool service
* Make the component OSGi friendly
Added:
servicemix/components/engines/servicemix-osworkflow/trunk/src/main/resources/
servicemix/components/engines/servicemix-osworkflow/trunk/src/main/resources/META-INF/
servicemix/components/engines/servicemix-osworkflow/trunk/src/main/resources/META-INF/spring/
servicemix/components/engines/servicemix-osworkflow/trunk/src/main/resources/META-INF/spring/servicemix-osworkflow.xml
Removed:
servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/OSWorkflowBootstrap.java
servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/WorkflowManager.java
Modified:
servicemix/components/engines/servicemix-osworkflow/trunk/pom.xml
servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/OSWorkflow.java
servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/OSWorkflowEndpoint.java
Modified: servicemix/components/engines/servicemix-osworkflow/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-osworkflow/trunk/pom.xml?rev=680712&r1=680711&r2=680712&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-osworkflow/trunk/pom.xml (original)
+++ servicemix/components/engines/servicemix-osworkflow/trunk/pom.xml Tue Jul
29 07:32:55 2008
@@ -42,6 +42,19 @@
<previous.releases>3.1.2,3.2,3.2.1</previous.releases>
<servicemix-version>3.2.1</servicemix-version>
<servicemix-shared-version>4.0-SNAPSHOT</servicemix-shared-version>
+
+ <servicemix.osgi.import>
+ org.apache.servicemix.common,
+ org.apache.servicemix.common.osgi,
+ org.apache.servicemix.executors.impl,
+ org.apache.xbean.spring.context.v2,
+ org.springframework.beans.factory.xml,
+ *
+ </servicemix.osgi.import>
+ <servicemix.osgi.export>
+ org.apache.servicemix.osworkflow*;version=${project.version},
+
META-INF.services.org.apache.xbean.spring.http.servicemix.apache.org.osworkflow
+ </servicemix.osgi.export>
</properties>
<dependencies>
@@ -104,7 +117,6 @@
<extensions>true</extensions>
<configuration>
<type>service-engine</type>
-
<bootstrap>org.apache.servicemix.osworkflow.OSWorkflowBootstrap</bootstrap>
<component>org.apache.servicemix.osworkflow.OSWorkflowComponent</component>
</configuration>
</plugin>
Modified:
servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/OSWorkflow.java
URL:
http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/OSWorkflow.java?rev=680712&r1=680711&r2=680712&view=diff
==============================================================================
---
servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/OSWorkflow.java
(original)
+++
servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/OSWorkflow.java
Tue Jul 29 07:32:55 2008
@@ -36,7 +36,7 @@
/**
* @author lhe
*/
-public class OSWorkflow extends Thread {
+public class OSWorkflow implements Runnable {
public static final String KEY_EXCHANGE = "exchange";
@@ -89,8 +89,6 @@
*/
public OSWorkflow(OSWorkflowEndpoint ep, String workflowName, int action,
Map map, String caller, MessageExchange exchange) {
- super(workflowName);
- setDaemon(true);
this.endpoint = ep; // remember the endpoint which called the
osworkflow
this.osWorkflowName = workflowName;
@@ -105,10 +103,7 @@
this.map.put(KEY_CALLER, this.caller);
this.map.put(KEY_IN_MESSAGE, this.exchange.getMessage("in"));
this.map.put(KEY_EXCHANGE, this.exchange);
- this.map
- .put(
- KEY_ASYNC_PROCESSING,
- this.exchange instanceof InOnly || this.exchange
instanceof RobustInOnly);
+ this.map.put(KEY_ASYNC_PROCESSING, this.exchange instanceof InOnly ||
this.exchange instanceof RobustInOnly);
}
/**
@@ -121,26 +116,26 @@
this.osWorkflowInstance = new BasicWorkflow(this.caller);
DefaultConfiguration config = new DefaultConfiguration();
this.osWorkflowInstance.setConfiguration(config);
- long wfId = this.osWorkflowInstance.initialize(
- this.osWorkflowName, this.action, this.map);
+ long wfId = this.osWorkflowInstance.initialize(this.osWorkflowName,
this.action, this.map);
return wfId;
}
/*
* (non-Javadoc)
*
- * @see java.lang.Thread#run()
+ * @see java.lang.Runnable#run()
*/
- @Override
public void run() {
// call the endpoint method for init actions
this.endpoint.preWorkflow();
- log.debug("Starting workflow...");
- log.debug("Name: " + this.osWorkflowName);
- log.debug("Action: " + this.action);
- log.debug("Caller: " + this.caller);
- log.debug("Map: " + this.map);
+ if (log.isDebugEnabled()) {
+ log.debug("Starting workflow...");
+ log.debug("Name: " + this.osWorkflowName);
+ log.debug("Action: " + this.action);
+ log.debug("Caller: " + this.caller);
+ log.debug("Map: " + this.map);
+ }
// loop as long as there are more actions to do and the workflow is not
// finished or aborted
@@ -157,8 +152,7 @@
}
// determine the available actions
- int[] availableActions = this.osWorkflowInstance
- .getAvailableActions(this.workflowId, this.map);
+ int[] availableActions =
this.osWorkflowInstance.getAvailableActions(this.workflowId, this.map);
// check if there are more actions available
if (availableActions.length == 0) {
@@ -172,8 +166,7 @@
log.debug("call action " + nextAction);
try {
// call the action
- this.osWorkflowInstance.doAction(this.workflowId,
- nextAction, this.map);
+
this.osWorkflowInstance.doAction(this.workflowId,nextAction, this.map);
} catch (InvalidInputException iiex) {
log.error(iiex);
aborted = true;
@@ -184,13 +177,15 @@
}
}
- log.debug("Stopping workflow...");
- log.debug("Name: " + this.osWorkflowName);
- log.debug("Action: " + this.action);
- log.debug("Caller: " + this.caller);
- log.debug("Map: " + this.map);
- log.debug("WorkflowId: " + this.workflowId);
- log.debug("End state: " + (finished ? "Finished" : "Aborted"));
+ if (log.isDebugEnabled()) {
+ log.debug("Stopping workflow...");
+ log.debug("Name: " + this.osWorkflowName);
+ log.debug("Action: " + this.action);
+ log.debug("Caller: " + this.caller);
+ log.debug("Map: " + this.map);
+ log.debug("WorkflowId: " + this.workflowId);
+ log.debug("End state: " + (finished ? "Finished" : "Aborted"));
+ }
// call the endpoint method for cleanup actions or message exchange
this.endpoint.postWorkflow();
Modified:
servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/OSWorkflowEndpoint.java
URL:
http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/OSWorkflowEndpoint.java?rev=680712&r1=680711&r2=680712&view=diff
==============================================================================
---
servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/OSWorkflowEndpoint.java
(original)
+++
servicemix/components/engines/servicemix-osworkflow/trunk/src/main/java/org/apache/servicemix/osworkflow/OSWorkflowEndpoint.java
Tue Jul 29 07:32:55 2008
@@ -18,39 +18,30 @@
import java.util.HashMap;
-import javax.jbi.component.ComponentContext;
-import javax.jbi.management.DeploymentException;
-import javax.jbi.messaging.DeliveryChannel;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.InOut;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessageExchange.Role;
-import javax.jbi.messaging.MessageExchangeFactory;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
import javax.jbi.messaging.RobustInOnly;
-import javax.jbi.servicedesc.ServiceEndpoint;
import javax.xml.namespace.QName;
import javax.xml.transform.Source;
-import org.apache.servicemix.common.Endpoint;
-import org.apache.servicemix.common.ExchangeProcessor;
+import org.apache.servicemix.common.endpoints.ProviderEndpoint;
+import org.apache.servicemix.common.EndpointSupport;
import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.executors.Executor;
/**
* @org.apache.xbean.XBean element="endpoint"
*
* @author lhe
*/
-public class OSWorkflowEndpoint extends Endpoint implements ExchangeProcessor {
- private static final long TIME_OUT = 30000;
-
- private ServiceEndpoint activated;
+public class OSWorkflowEndpoint extends ProviderEndpoint {
- private DeliveryChannel channel;
-
- private MessageExchangeFactory exchangeFactory;
+ private static final long TIME_OUT = 30000;
private String workflowName;
@@ -58,79 +49,19 @@
private int action;
- /*
- * (non-Javadoc)
- *
- * @see org.apache.servicemix.common.Endpoint#getRole()
- */
- public Role getRole() {
- return Role.PROVIDER;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.servicemix.common.Endpoint#activate()
- */
- public void activate() throws Exception {
- logger = this.serviceUnit.getComponent().getLogger();
- ComponentContext ctx = getServiceUnit().getComponent()
- .getComponentContext();
- channel = ctx.getDeliveryChannel();
- exchangeFactory = channel.createExchangeFactory();
- activated = ctx.activateEndpoint(service, endpoint);
- start();
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.servicemix.common.Endpoint#deactivate()
- */
- public void deactivate() throws Exception {
- stop();
- ServiceEndpoint ep = activated;
- activated = null;
- ComponentContext ctx = getServiceUnit().getComponent()
- .getComponentContext();
- ctx.deactivateEndpoint(ep);
- }
+ private Executor executor;
- /*
- * (non-Javadoc)
- *
- * @see org.apache.servicemix.common.Endpoint#getProcessor()
- */
- public ExchangeProcessor getProcessor() {
- return this;
- }
+ private SourceTransformer sourceTransformer = new SourceTransformer();
- /*
- * (non-Javadoc)
- *
- * @see org.apache.servicemix.common.Endpoint#validate()
- */
- public void validate() throws DeploymentException {
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.servicemix.common.ExchangeProcessor#start()
- */
public void start() throws Exception {
- // initialize the workflow manager
- WorkflowManager.getInstance();
+ super.start();
+ OSWorkflowComponent component = (OSWorkflowComponent)
getServiceUnit().getComponent();
+ executor = component.getExecutorFactory().createExecutor("component."
+ component.getComponentName() + "." + EndpointSupport.getKey(this));
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.servicemix.common.ExchangeProcessor#stop()
- */
- public void stop() {
- // shut down first finishing running threads
- WorkflowManager.getInstance().prepareShutdown(true);
+ public void stop() throws Exception {
+ executor.shutdown();
+ super.stop();
}
/*
@@ -162,8 +93,7 @@
onProviderExchange(exchange);
} else {
// Unknown role
- throw new MessagingException(
- "OSWorkflowEndpoint.onMessageExchange(): Unknown role: "
+ throw new
MessagingException("OSWorkflowEndpoint.onMessageExchange(): Unknown role: "
+ exchange.getRole());
}
}
@@ -178,12 +108,10 @@
throws MessagingException {
// Out message
if (exchange.getMessage("out") != null) {
- exchange.setStatus(ExchangeStatus.DONE);
- channel.send(exchange);
+ done(exchange);
} else if (exchange.getFault() != null) {
//Fault message
- exchange.setStatus(ExchangeStatus.DONE);
- channel.send(exchange);
+ done(exchange);
} else {
//This is not compliant with the default MEPs
throw new MessagingException(
@@ -208,8 +136,7 @@
return;
} else if (exchange.getFault() != null) {
//Fault message
- exchange.setStatus(ExchangeStatus.DONE);
- channel.send(exchange);
+ done(exchange);
} else {
NormalizedMessage in = exchange.getMessage("in");
@@ -222,49 +149,12 @@
OSWorkflow osWorkflow = new OSWorkflow(this, this.workflowName,
this.action, new HashMap(), this.caller, exchange);
- if (exchange instanceof InOnly
- || exchange instanceof RobustInOnly) {
- // do start the workflow in separate thread
- try {
- WorkflowManager.getInstance().executeWorkflow(
- osWorkflow);
- } catch (Exception ex) {
- logger.error(ex);
- }
- } else {
- // synchronous processing, keep state ACTIVE
- // do start the workflow and join the thread
- try {
- osWorkflow.start();
- osWorkflow.join();
- } catch (Exception ex) {
- logger.error(ex);
- }
- }
+ executor.execute(osWorkflow);
}
}
}
/**
- * returns the delivery channel for the endpoint
- *
- * @return the delivery channel
- */
- public DeliveryChannel getChannel() {
-
- return this.channel;
- }
-
- /**
- * returns the message exchange factory
- *
- * @return the message exchange factory
- */
- public MessageExchangeFactory getMessageExchangeFactory() {
- return this.exchangeFactory;
- }
-
- /**
* sends the given DOMSource as message to the given service (inOnly)
*
* @param service
@@ -277,12 +167,11 @@
*/
public boolean sendMessage(QName service, Source source)
throws MessagingException {
- InOnly inOnly = channel.createExchangeFactoryForService(service)
- .createInOnlyExchange();
+ InOnly inOnly =
getChannel().createExchangeFactoryForService(service).createInOnlyExchange();
NormalizedMessage msg = inOnly.createMessage();
msg.setContent(source);
inOnly.setInMessage(msg);
- if (channel.sendSync(inOnly)) {
+ if (getChannel().sendSync(inOnly)) {
return inOnly.getStatus() == ExchangeStatus.DONE;
} else {
return false;
@@ -302,22 +191,15 @@
*/
public Source sendRequest(QName service, Source source)
throws MessagingException {
- InOut inOut = channel.createExchangeFactoryForService(service)
- .createInOutExchange();
+ InOut inOut =
getChannel().createExchangeFactoryForService(service).createInOutExchange();
NormalizedMessage msg = inOut.createMessage();
msg.setContent(source);
inOut.setInMessage(msg);
- if (channel.sendSync(inOut)) {
- SourceTransformer sourceTransformer = new SourceTransformer();
-
+ if (getChannel().sendSync(inOut)) {
try {
- Source result = sourceTransformer.toDOMSource(inOut
- .getOutMessage().getContent());
-
- inOut.setStatus(ExchangeStatus.DONE);
- channel.send(inOut);
-
+ Source result =
sourceTransformer.toDOMSource(inOut.getOutMessage().getContent());
+ done(inOut);
return result;
} catch (Exception ex) {
ex.printStackTrace();
@@ -341,12 +223,11 @@
*/
public MessageExchange sendRawInOutRequest(QName service, Source source)
throws MessagingException {
- InOut inOut = channel.createExchangeFactoryForService(service)
- .createInOutExchange();
+ InOut inOut =
getChannel().createExchangeFactoryForService(service).createInOutExchange();
NormalizedMessage msg = inOut.createMessage();
msg.setContent(source);
inOut.setInMessage(msg);
- if (channel.sendSync(inOut)) {
+ if (getChannel().sendSync(inOut)) {
return inOut;
} else {
return null;
@@ -368,11 +249,9 @@
MessageExchange exchange = null;
if (inOut) {
- exchange = channel.createExchangeFactoryForService(qname)
- .createInOutExchange();
+ exchange =
getChannel().createExchangeFactoryForService(qname).createInOutExchange();
} else {
- exchange = channel.createExchangeFactoryForService(qname)
- .createInOnlyExchange();
+ exchange =
getChannel().createExchangeFactoryForService(qname).createInOnlyExchange();
}
return exchange;
@@ -380,18 +259,17 @@
/**
* sends a done to the channel
- *
+ *
* @param ex
* @throws MessagingException
*/
public void done(MessageExchange ex) throws MessagingException {
- ex.setStatus(ExchangeStatus.DONE);
- channel.send(ex);
+ super.done(ex);
}
/**
* sends a msg to the channel
- *
+ *
* @param ex
* @param sync
* @throws MessagingException
@@ -399,21 +277,20 @@
public void send(MessageExchange ex, boolean sync)
throws MessagingException {
if (sync) {
- channel.sendSync(ex, TIME_OUT);
+ getChannel().sendSync(ex, TIME_OUT);
} else {
- channel.send(ex);
+ getChannel().send(ex);
}
}
/**
* sends a error to the channel
- *
+ *
* @param ex
* @throws MessagingException
*/
public void fail(MessageExchange ex) throws MessagingException {
- ex.setStatus(ExchangeStatus.ERROR);
- channel.send(ex);
+ super.fail(ex, new Exception("Failure"));
}
/**
@@ -474,4 +351,5 @@
public void postWorkflow() {
// nothing for now
}
+
}
Added:
servicemix/components/engines/servicemix-osworkflow/trunk/src/main/resources/META-INF/spring/servicemix-osworkflow.xml
URL:
http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-osworkflow/trunk/src/main/resources/META-INF/spring/servicemix-osworkflow.xml?rev=680712&view=auto
==============================================================================
---
servicemix/components/engines/servicemix-osworkflow/trunk/src/main/resources/META-INF/spring/servicemix-osworkflow.xml
(added)
+++
servicemix/components/engines/servicemix-osworkflow/trunk/src/main/resources/META-INF/spring/servicemix-osworkflow.xml
Tue Jul 29 07:32:55 2008
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:osgi="http://www.springframework.org/schema/osgi"
+ xmlns:osgix="http://www.springframework.org/schema/osgi-compendium"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd
+ http://www.springframework.org/schema/osgi
+ http://www.springframework.org/schema/osgi/spring-osgi.xsd
+ http://www.springframework.org/schema/osgi-compendium
+
http://www.springframework.org/schema/osgi-compendium/spring-osgi-compendium.xsd">
+
+ <bean id="servicemix-osworkflow"
class="org.apache.servicemix.osworkflow.OSWorkflowComponent">
+ <property name="executorFactory" ref="executorFactory" />
+ </bean>
+
+ <bean id="executorFactory"
class="org.apache.servicemix.executors.impl.ExecutorFactoryImpl">
+ <property name="defaultConfig">
+ <bean class="org.apache.servicemix.executors.impl.ExecutorConfig">
+ <property name="corePoolSize"
value="${threadPoolCorePoolSize}"/>
+ <property name="maximumPoolSize"
value="${threadPoolMaximumPoolSize}"/>
+ <property name="queueSize" value="${threadPoolQueueSize}"/>
+ </bean>
+ </property>
+ </bean>
+
+ <bean id="endpoint-tracker"
class="org.apache.servicemix.common.osgi.EndpointTracker">
+ <property name="component" ref="servicemix-osworkflow" />
+ </bean>
+
+ <osgi:list id="endpoints"
+ interface="org.apache.servicemix.common.osgi.EndpointWrapper"
+ cardinality="0..N">
+ <osgi:listener ref="endpoint-tracker" bind-method="register"
unbind-method="unregister" />
+ </osgi:list>
+
+ <osgi:service ref="servicemix-osworkflow"
interface="javax.jbi.component.Component">
+ <osgi:service-properties>
+ <entry key="NAME" value="servicemix-osworkflow" />
+ <entry key="TYPE" value="service-engine" />
+ </osgi:service-properties>
+ </osgi:service>
+
+ <osgix:property-placeholder persistent-id="servicemix-osworkflow">
+ <osgix:default-properties>
+ <prop key="threadPoolCorePoolSize">8</prop>
+ <prop key="threadPoolMaximumPoolSize">32</prop>
+ <prop key="threadPoolQueueSize">256</prop>
+ </osgix:default-properties>
+ </osgix:property-placeholder>
+
+</beans>