Author: lhein
Date: Wed Nov 24 14:15:05 2010
New Revision: 1038612
URL: http://svn.apache.org/viewvc?rev=1038612&view=rev
Log:
fixed problem where splitter sets DONE state of the original exchange one part
message too early (see SM-2013)
Modified:
servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java
Modified:
servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java
URL:
http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java?rev=1038612&r1=1038611&r2=1038612&view=diff
==============================================================================
---
servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java
(original)
+++
servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java
Wed Nov 24 14:15:05 2010
@@ -244,41 +244,44 @@ public abstract class AbstractSplitter e
if (exchange.getStatus() == ExchangeStatus.DONE) {
// If the acks integer is not here anymore, the message
response has been sent already
if (acks != null) {
- if (acks + 1 >= count) {
+ acks++;
+ if (acks < count) {
MessageExchange me = (MessageExchange)
store.load(corrId);
done(me);
} else {
- store.store(corrId + ".acks", Integer.valueOf(acks
+ 1));
+ store.store(corrId + ".acks",
Integer.valueOf(acks));
removeLock = false;
}
}
} else if (exchange.getStatus() == ExchangeStatus.ERROR) {
// If the acks integer is not here anymore, the message
response has been sent already
if (acks != null) {
+ acks++;
if (reportErrors) {
MessageExchange me = (MessageExchange)
store.load(corrId);
fail(me, exchange.getError());
- } else if (acks + 1 >= count) {
+ } else if (acks < count) {
MessageExchange me = (MessageExchange)
store.load(corrId);
done(me);
} else {
- store.store(corrId + ".acks", Integer.valueOf(acks
+ 1));
+ store.store(corrId + ".acks",
Integer.valueOf(acks));
removeLock = false;
}
}
} else if (exchange.getFault() != null) {
// If the acks integer is not here anymore, the message
response has been sent already
if (acks != null) {
+ acks++;
if (reportErrors) {
MessageExchange me = (MessageExchange)
store.load(corrId);
MessageUtil.transferToFault(MessageUtil.copyFault(exchange), me);
send(me);
done(exchange);
- } else if (acks + 1 >= count) {
+ } else if (acks < count) {
MessageExchange me = (MessageExchange)
store.load(corrId);
done(me);
} else {
- store.store(corrId + ".acks", Integer.valueOf(acks
+ 1));
+ store.store(corrId + ".acks",
Integer.valueOf(acks));
removeLock = false;
}
} else {
@@ -312,7 +315,8 @@ public abstract class AbstractSplitter e
target.configureTarget(parts[i], getContext());
send(parts[i]);
}
- done(exchange);
+ // do not done the exchange on provider side!
+ //done(exchange);
}
}
}