Author: gertv
Date: Thu Aug 7 04:22:24 2008
New Revision: 683584
URL: http://svn.apache.org/viewvc?rev=683584&view=rev
Log:
SM-1502: Fixing async support for servicemix-drools when the same endpoint is
hit multiple times in the same flow
Modified:
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java
Modified:
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java
URL:
http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java?rev=683584&r1=683583&r2=683584&view=diff
==============================================================================
---
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java
(original)
+++
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java
Thu Aug 7 04:22:24 2008
@@ -27,6 +27,11 @@
*/
@SuppressWarnings("unchecked")
public class DroolsComponent extends DefaultComponent {
+
+ /**
+ * Property to correlate servicemix-drools exchanges
+ */
+ public static final String DROOLS_CORRELATION_ID =
"org.apache.servicemix.drools.correlation_id";
private DroolsEndpoint[] endpoints;
Modified:
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java
URL:
http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java?rev=683584&r1=683583&r2=683584&view=diff
==============================================================================
---
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java
(original)
+++
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java
Thu Aug 7 04:22:24 2008
@@ -192,7 +192,7 @@
* Handle a consumer exchange
*/
private void handleConsumerExchange(MessageExchange exchange) throws
MessagingException {
- String correlation = getCorrelationId(exchange);
+ String correlation = (String)
exchange.getProperty(DroolsComponent.DROOLS_CORRELATION_ID);
JbiHelper helper = pending.get(correlation);
if (helper != null) {
MessageExchange original =
helper.getExchange().getInternalExchange();
@@ -211,7 +211,7 @@
// update the rule engine's working memory to trigger post-done
rules
helper.update();
} else {
- logger.debug("No matching exchange found for " +
exchange.getExchangeId() + ", no additional rules will be triggered");
+ logger.debug("No pending exchange found for " + correlation + ",
no additional rules will be triggered");
}
}
@@ -233,15 +233,15 @@
protected void drools(MessageExchange exchange) throws Exception {
WorkingMemory memory = createWorkingMemory(exchange);
JbiHelper helper = populateWorkingMemory(memory, exchange);
- pending.put(getCorrelationId(exchange), helper);
+ pending.put(exchange.getExchangeId(), helper);
memory.fireAllRules();
//no rules were fired --> must be config problem
if (helper.getRulesFired() < 1) {
fail(exchange, new Exception("No rules have handled the exchange.
Check your rule base."));
} else {
- //a rule was triggered but no message was forwarded -> message has
been handled by drools
- if (helper.getForwarded() < 1) {
+ //a rule was triggered and the message has been answered or
faulted by the drools endpoint
+ if (helper.isExchangeHandled()) {
pending.remove(exchange);
}
}
@@ -306,7 +306,7 @@
@Override
protected void send(MessageExchange me) throws MessagingException {
//remove the exchange from the list of pending exchanges
- pending.remove(getCorrelationId(me));
+ pending.remove(me.getExchangeId());
super.send(me);
}
}
Modified:
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java
URL:
http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java?rev=683584&r1=683583&r2=683584&view=diff
==============================================================================
---
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java
(original)
+++
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java
Thu Aug 7 04:22:24 2008
@@ -30,6 +30,7 @@
import org.apache.servicemix.common.EndpointSupport;
import org.apache.servicemix.common.JbiConstants;
import org.apache.servicemix.common.util.URIResolver;
+import org.apache.servicemix.drools.DroolsComponent;
import org.apache.servicemix.drools.DroolsEndpoint;
import org.apache.servicemix.jbi.jaxp.SourceTransformer;
import org.apache.servicemix.jbi.jaxp.StringSource;
@@ -51,7 +52,7 @@
private WorkingMemory memory;
private FactHandle exchangeFactHandle;
private int rulesFired;
- private int forwarded;
+ private boolean exchangeHandled = false;
public JbiHelper(DroolsEndpoint endpoint, MessageExchange exchange,
WorkingMemory memory) {
this.endpoint = endpoint;
@@ -126,8 +127,8 @@
String key = EndpointSupport.getKey(endpoint);
newMe.setProperty(JbiConstants.SENDER_ENDPOINT, key);
newMe.setProperty(JbiConstants.CORRELATION_ID,
DroolsEndpoint.getCorrelationId(this.exchange.getInternalExchange()));
+ newMe.setProperty(DroolsComponent.DROOLS_CORRELATION_ID,
me.getExchangeId());
getChannel().send(newMe);
- forwarded++;
}
/**
@@ -161,6 +162,7 @@
me.setFault(fault);
getChannel().send(me);
}
+ exchangeHandled = true;
}
/**
@@ -180,6 +182,7 @@
me.setFault(fault);
getChannel().send(me);
}
+ exchangeHandled = true;
}
/**
@@ -201,6 +204,7 @@
out.setContent(content);
me.setMessage(out, "out");
getChannel().sendSync(me);
+ exchangeHandled = true;
update();
}
@@ -221,12 +225,12 @@
}
/**
- * Get the number of times a message has been forwarded
+ * Has the MessageExchange been handled by the drools endpoint?
*
- * @return the number of forwards
+ * @return
*/
- public int getForwarded() {
- return forwarded;
+ public boolean isExchangeHandled() {
+ return exchangeHandled;
}
// event handler callbacks