Author: davsclaus
Date: Fri Jan 14 10:36:59 2011
New Revision: 1058930
URL: http://svn.apache.org/viewvc?rev=1058930&view=rev
Log:
CAMEL-3535: Use a copy of the incoming exchange for aggregation without sharing
UoW. This ensures that on compleiton on the completed exchange always will be
executed. This ensures the aggregation repository will always be confirmed, and
the internal in progress map will have the completed exchange removed as well,
preventing it from eating memory over time.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/BeanBeforeAggregateIssueTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1058930&r1=1058929&r2=1058930&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
Fri Jan 14 10:36:59 2011
@@ -177,12 +177,16 @@ public class AggregateProcessor extends
throw new ClosedCorrelationKeyException(key, exchange);
}
+ // copy exchange, and do not share the unit of work
+ // the aggregated output runs in another unit of work
+ Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
+
// when memory based then its fast using synchronized, but if the
aggregation repository is IO
// bound such as JPA etc then concurrent aggregation per correlation
key could
// improve performance as we can run aggregation repository get/add in
parallel
lock.lock();
try {
- doAggregation(key, exchange);
+ doAggregation(key, copy);
} finally {
lock.unlock();
}
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/BeanBeforeAggregateIssueTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/BeanBeforeAggregateIssueTest.java?rev=1058930&r1=1058929&r2=1058930&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/BeanBeforeAggregateIssueTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/BeanBeforeAggregateIssueTest.java
Fri Jan 14 10:36:59 2011
@@ -16,22 +16,37 @@
*/
package org.apache.camel.processor.aggregator;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.NotifyBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.BodyInAggregatingStrategy;
+import org.apache.camel.processor.aggregate.MemoryAggregationRepository;
/**
* @version $Revision$
*/
public class BeanBeforeAggregateIssueTest extends ContextTestSupport {
+ private MyAggRepo myRepo = new MyAggRepo();
+
public void testBeanBeforeAggregation() throws Exception {
+ NotifyBuilder notify = new NotifyBuilder(context).whenDone(3).create();
+
getMockEndpoint("mock:result").expectedBodiesReceived("A+B");
template.sendBody("seda:start", "A");
template.sendBody("seda:start", "B");
assertMockEndpointsSatisfied();
+
+ // wait for all exchanges to be done (2 input + 1 aggregated)
+ notify.matches(5, TimeUnit.SECONDS);
+
+ // should have confirmed
+ assertTrue("Should have confirmed", myRepo.isConfirm());
}
@Override
@@ -42,6 +57,7 @@ public class BeanBeforeAggregateIssueTes
from("seda:start")
.bean(TestBean.class)
.aggregate(constant("true"), new
BodyInAggregatingStrategy())
+ .aggregationRepository(myRepo)
.completionSize(2)
.to("mock:result");
}
@@ -54,4 +70,20 @@ public class BeanBeforeAggregateIssueTes
return foo;
}
}
+
+ private final class MyAggRepo extends MemoryAggregationRepository {
+
+ private volatile boolean confirm;
+
+ @Override
+ public void confirm(CamelContext camelContext, String exchangeId) {
+ // test that confirm is invoked
+ super.confirm(camelContext, exchangeId);
+ confirm = true;
+ }
+
+ public boolean isConfirm() {
+ return confirm;
+ }
+ }
}