Author: davsclaus
Date: Fri Jan 14 10:36:59 2011
New Revision: 1058930

URL: http://svn.apache.org/viewvc?rev=1058930&view=rev
Log:
CAMEL-3535: Use a copy of the incoming exchange for aggregation without sharing 
UoW. This ensures that on compleiton on the completed exchange always will be 
executed. This ensures the aggregation repository will always be confirmed, and 
the internal in progress map will have the completed exchange removed as well, 
preventing it from eating memory over time.

Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/BeanBeforeAggregateIssueTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1058930&r1=1058929&r2=1058930&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 Fri Jan 14 10:36:59 2011
@@ -177,12 +177,16 @@ public class AggregateProcessor extends 
             throw new ClosedCorrelationKeyException(key, exchange);
         }
 
+        // copy exchange, and do not share the unit of work
+        // the aggregated output runs in another unit of work
+        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
+
         // when memory based then its fast using synchronized, but if the 
aggregation repository is IO
         // bound such as JPA etc then concurrent aggregation per correlation 
key could
         // improve performance as we can run aggregation repository get/add in 
parallel
         lock.lock();
         try {
-            doAggregation(key, exchange);
+            doAggregation(key, copy);
         } finally {
             lock.unlock();
         }

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/BeanBeforeAggregateIssueTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/BeanBeforeAggregateIssueTest.java?rev=1058930&r1=1058929&r2=1058930&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/BeanBeforeAggregateIssueTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/BeanBeforeAggregateIssueTest.java
 Fri Jan 14 10:36:59 2011
@@ -16,22 +16,37 @@
  */
 package org.apache.camel.processor.aggregator;
 
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.processor.BodyInAggregatingStrategy;
+import org.apache.camel.processor.aggregate.MemoryAggregationRepository;
 
 /**
  * @version $Revision$
  */
 public class BeanBeforeAggregateIssueTest extends ContextTestSupport {
 
+    private MyAggRepo myRepo = new MyAggRepo();
+
     public void testBeanBeforeAggregation() throws Exception {
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(3).create();
+
         getMockEndpoint("mock:result").expectedBodiesReceived("A+B");
 
         template.sendBody("seda:start", "A");
         template.sendBody("seda:start", "B");
 
         assertMockEndpointsSatisfied();
+
+        // wait for all exchanges to be done (2 input + 1 aggregated)
+        notify.matches(5, TimeUnit.SECONDS);
+
+        // should have confirmed
+        assertTrue("Should have confirmed", myRepo.isConfirm());
     }
 
     @Override
@@ -42,6 +57,7 @@ public class BeanBeforeAggregateIssueTes
                 from("seda:start")
                     .bean(TestBean.class)
                     .aggregate(constant("true"), new 
BodyInAggregatingStrategy())
+                        .aggregationRepository(myRepo)
                         .completionSize(2)
                         .to("mock:result");
             }
@@ -54,4 +70,20 @@ public class BeanBeforeAggregateIssueTes
             return foo;
         }
     }
+
+    private final class MyAggRepo extends MemoryAggregationRepository {
+
+        private volatile boolean confirm;
+
+        @Override
+        public void confirm(CamelContext camelContext, String exchangeId) {
+            // test that confirm is invoked
+            super.confirm(camelContext, exchangeId);
+            confirm = true;
+        }
+
+        public boolean isConfirm() {
+            return confirm;
+        }
+    }
 }


Reply via email to