Author: gertv
Date: Thu Aug 7 00:03:07 2008
New Revision: 683528
URL: http://svn.apache.org/viewvc?rev=683528&view=rev
Log:
SM-1502: servicemix-drools should allow for asynchronous message handling
Modified:
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java
Modified:
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java
URL:
http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java?rev=683528&r1=683527&r2=683528&view=diff
==============================================================================
---
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java
(original)
+++
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java
Thu Aug 7 00:03:07 2008
@@ -25,6 +25,7 @@
* @author gnodet
* @org.apache.xbean.XBean element="component"
*/
[EMAIL PROTECTED]("unchecked")
public class DroolsComponent extends DefaultComponent {
private DroolsEndpoint[] endpoints;
Modified:
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java
URL:
http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java?rev=683528&r1=683527&r2=683528&view=diff
==============================================================================
---
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java
(original)
+++
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java
Thu Aug 7 00:03:07 2008
@@ -21,18 +21,24 @@
import java.net.URL;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import javax.jbi.JBIException;
import javax.jbi.management.DeploymentException;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.MessageExchange.Role;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.xml.namespace.NamespaceContext;
import javax.xml.namespace.QName;
import org.apache.servicemix.common.DefaultComponent;
+import org.apache.servicemix.common.JbiConstants;
import org.apache.servicemix.common.ServiceUnit;
import org.apache.servicemix.common.endpoints.ProviderEndpoint;
+import org.apache.servicemix.common.util.MessageUtil;
import org.apache.servicemix.drools.model.JbiHelper;
import org.drools.RuleBase;
import org.drools.WorkingMemory;
@@ -54,6 +60,7 @@
private String defaultTargetURI;
private Map<String, Object> globals;
private List<Object> assertedObjects;
+ private ConcurrentMap<String, JbiHelper> pending = new
ConcurrentHashMap<String, JbiHelper>();
public DroolsEndpoint() {
super();
@@ -174,26 +181,69 @@
* javax.jbi.messaging.MessageExchange,
javax.jbi.messaging.NormalizedMessage)
*/
public void process(MessageExchange exchange) throws Exception {
- drools(exchange);
+ if (exchange.getRole() == Role.PROVIDER) {
+ handleProviderExchange(exchange);
+ } else {
+ handleConsumerExchange(exchange);
+ }
}
- protected void drools(MessageExchange exchange) throws Exception {
- WorkingMemory memory = createWorkingMemory(exchange);
- populateWorkingMemory(memory, exchange);
- memory.fireAllRules();
- postProcess(exchange, memory);
+ /*
+ * Handle a consumer exchange
+ */
+ private void handleConsumerExchange(MessageExchange exchange) throws
MessagingException {
+ String correlation = getCorrelationId(exchange);
+ JbiHelper helper = pending.get(correlation);
+ if (helper != null) {
+ MessageExchange original =
helper.getExchange().getInternalExchange();
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ done(original);
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ fail(original, exchange.getError());
+ } else {
+ if (exchange.getFault() != null) {
+ MessageUtil.transferFaultToFault(exchange, original);
+ } else {
+ MessageUtil.transferOutToOut(exchange, original);
+ }
+ send(original);
+ }
+ // update the rule engine's working memory to trigger post-done
rules
+ helper.update();
+ } else {
+ logger.debug("No matching exchange found for " +
exchange.getExchangeId() + ", no additional rules will be triggered");
+ }
}
- protected void postProcess(MessageExchange exchange, WorkingMemory memory)
throws Exception {
+ private void handleProviderExchange(MessageExchange exchange) throws
Exception {
if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
- String uri = getDefaultRouteURI();
- if (uri != null) {
- JbiHelper helper = (JbiHelper) memory.getGlobal("jbi");
- helper.route(uri);
- }
+ drools(exchange);
}
- if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ }
+
+ public static String getCorrelationId(MessageExchange exchange) {
+ Object correlation = exchange.getProperty(JbiConstants.CORRELATION_ID);
+ if (correlation == null) {
+ return exchange.getExchangeId();
+ } else {
+ return correlation.toString();
+ }
+ }
+
+ protected void drools(MessageExchange exchange) throws Exception {
+ WorkingMemory memory = createWorkingMemory(exchange);
+ JbiHelper helper = populateWorkingMemory(memory, exchange);
+ pending.put(getCorrelationId(exchange), helper);
+ memory.fireAllRules();
+
+ //no rules were fired --> must be config problem
+ if (helper.getRulesFired() < 1) {
fail(exchange, new Exception("No rules have handled the exchange.
Check your rule base."));
+ } else {
+ //a rule was triggered but no message was forwarded -> message has
been handled by drools
+ if (helper.getForwarded() < 1) {
+ pending.remove(exchange);
+ }
}
}
@@ -201,8 +251,9 @@
return ruleBase.newStatefulSession();
}
- protected void populateWorkingMemory(WorkingMemory memory, MessageExchange
exchange) throws Exception {
- memory.setGlobal("jbi", new JbiHelper(this, exchange, memory));
+ protected JbiHelper populateWorkingMemory(WorkingMemory memory,
MessageExchange exchange) throws Exception {
+ JbiHelper helper = new JbiHelper(this, exchange, memory);
+ memory.setGlobal("jbi", helper);
if (assertedObjects != null) {
for (Object o : assertedObjects) {
memory.insert(o);
@@ -213,6 +264,7 @@
memory.setGlobal(e.getKey(), e.getValue());
}
}
+ return helper;
}
public QName getDefaultTargetService() {
@@ -250,4 +302,11 @@
return null;
}
}
-}
+
+ @Override
+ protected void send(MessageExchange me) throws MessagingException {
+ //remove the exchange from the list of pending exchanges
+ pending.remove(getCorrelationId(me));
+ super.send(me);
+ }
+ }
Modified:
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java
URL:
http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java?rev=683528&r1=683527&r2=683528&view=diff
==============================================================================
---
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java
(original)
+++
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java
Thu Aug 7 00:03:07 2008
@@ -18,7 +18,6 @@
import javax.jbi.component.ComponentContext;
import javax.jbi.messaging.DeliveryChannel;
-import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.Fault;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.MessageExchange;
@@ -30,32 +29,35 @@
import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.common.EndpointSupport;
import org.apache.servicemix.common.JbiConstants;
-import org.apache.servicemix.common.util.MessageUtil;
import org.apache.servicemix.common.util.URIResolver;
import org.apache.servicemix.drools.DroolsEndpoint;
import org.apache.servicemix.jbi.jaxp.SourceTransformer;
import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jbi.util.MessageUtil;
import org.drools.FactHandle;
import org.drools.WorkingMemory;
+import org.drools.event.ActivationCreatedEvent;
+import org.drools.event.DefaultAgendaEventListener;
/**
* A helper class for use inside a rule to forward a message to an endpoint
- *
+ *
* @version $Revision: 426415 $
*/
-public class JbiHelper {
+public class JbiHelper extends DefaultAgendaEventListener {
private DroolsEndpoint endpoint;
private Exchange exchange;
private WorkingMemory memory;
private FactHandle exchangeFactHandle;
+ private int rulesFired;
+ private int forwarded;
- public JbiHelper(DroolsEndpoint endpoint,
- MessageExchange exchange,
- WorkingMemory memory) {
+ public JbiHelper(DroolsEndpoint endpoint, MessageExchange exchange,
WorkingMemory memory) {
this.endpoint = endpoint;
this.exchange = new Exchange(exchange, endpoint.getNamespaceContext());
this.memory = memory;
+ this.memory.addEventListener(this);
this.exchangeFactHandle = this.memory.insert(this.exchange);
}
@@ -81,14 +83,17 @@
/**
* Forwards the inbound message to the given target
- *
+ *
* @param uri
*/
public void route(String uri) throws MessagingException {
Source src = null;
routeTo(src, uri);
}
-
+
+ /**
+ * @see #routeTo(Source, String)
+ */
public void routeTo(String content, String uri) throws MessagingException {
if (content == null) {
routeTo(this.exchange.getInternalExchange().getMessage("in").getContent(), uri);
@@ -96,10 +101,17 @@
routeTo(new StringSource(content), uri);
}
}
-
+
+ /**
+ * Send a message to the uri
+ *
+ * @param content the message content
+ * @param uri the target endpoint's uri
+ * @throws MessagingException
+ */
public void routeTo(Source content, String uri) throws MessagingException {
MessageExchange me = this.exchange.getInternalExchange();
- String correlationId =
(String)exchange.getProperty(JbiConstants.CORRELATION_ID);
+
NormalizedMessage in = null;
if (content == null) {
in = me.getMessage("in");
@@ -113,35 +125,31 @@
// Set the sender endpoint property
String key = EndpointSupport.getKey(endpoint);
newMe.setProperty(JbiConstants.SENDER_ENDPOINT, key);
- newMe.setProperty(JbiConstants.CORRELATION_ID, correlationId);
- getChannel().sendSync(newMe);
- if (newMe.getStatus() == ExchangeStatus.DONE) {
- me.setStatus(ExchangeStatus.DONE);
- getChannel().send(me);
- } else if (newMe.getStatus() == ExchangeStatus.ERROR) {
- me.setStatus(ExchangeStatus.ERROR);
- me.setError(newMe.getError());
- getChannel().send(me);
- } else {
- if (newMe.getFault() != null) {
- MessageUtil.transferFaultToFault(newMe, me);
- } else {
- MessageUtil.transferOutToOut(newMe, me);
- }
- getChannel().sendSync(me);
- }
- update();
+ newMe.setProperty(JbiConstants.CORRELATION_ID,
DroolsEndpoint.getCorrelationId(this.exchange.getInternalExchange()));
+ getChannel().send(newMe);
+ forwarded++;
}
-
-
+
+ /**
+ * @see #routeToDefault(Source)
+ */
public void routeToDefault(String content) throws MessagingException {
routeTo(content, endpoint.getDefaultRouteURI());
}
-
+
+ /**
+ * Send this content to the default routing URI ([EMAIL PROTECTED]
DroolsEndpoint#getDefaultRouteURI()} specified on the endpoint
+ *
+ * @param content the message body
+ * @throws MessagingException
+ */
public void routeToDefault(Source content) throws MessagingException {
routeTo(content, endpoint.getDefaultRouteURI());
}
+ /**
+ * @see #fault(Source)
+ */
public void fault(String content) throws Exception {
MessageExchange me = this.exchange.getInternalExchange();
if (me instanceof InOnly) {
@@ -151,11 +159,16 @@
Fault fault = me.createFault();
fault.setContent(new StringSource(content));
me.setFault(fault);
- getChannel().sendSync(me);
+ getChannel().send(me);
}
- update();
}
-
+
+ /**
+ * Send a JBI Error message (for InOnly) or JBI Fault message (for the
other MEPs)
+ *
+ * @param content the error content
+ * @throws Exception
+ */
public void fault(Source content) throws Exception {
MessageExchange me = this.exchange.getInternalExchange();
if (me instanceof InOnly) {
@@ -165,15 +178,23 @@
Fault fault = me.createFault();
fault.setContent(content);
me.setFault(fault);
- getChannel().sendSync(me);
+ getChannel().send(me);
}
- update();
}
+ /**
+ * @see #answer(Source)
+ */
public void answer(String content) throws Exception {
answer(new StringSource(content));
}
-
+
+ /**
+ * Answer the exchange with the given response content
+ *
+ * @param content the response
+ * @throws Exception
+ */
public void answer(Source content) throws Exception {
MessageExchange me = this.exchange.getInternalExchange();
NormalizedMessage out = me.createMessage();
@@ -183,8 +204,35 @@
update();
}
- protected void update() {
+ /**
+ * Update the [EMAIL PROTECTED] MessageExchange} information in the rule
engine's [EMAIL PROTECTED] WorkingMemory}
+ */
+ public void update() {
this.memory.update(this.exchangeFactHandle, this.exchange);
}
+ /**
+ * Get the number of rules that were fired
+ *
+ * @return the number of rules
+ */
+ public int getRulesFired() {
+ return rulesFired;
+ }
+
+ /**
+ * Get the number of times a message has been forwarded
+ *
+ * @return the number of forwards
+ */
+ public int getForwarded() {
+ return forwarded;
+ }
+
+ // event handler callbacks
+ @Override
+ public void activationCreated(ActivationCreatedEvent event, WorkingMemory
workingMemory) {
+ rulesFired++;
+ }
+
}