Author: davsclaus
Date: Wed Oct 31 14:39:22 2012
New Revision: 1404174

URL: http://svn.apache.org/viewvc?rev=1404174&view=rev
Log:
CAMEL-5760: EIPs with AggregationStrategy now control lifecycle on strategy to 
allow end user to do custom logic in start/stop

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyServiceTest.java
      - copied, changed from r1404122, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java?rev=1404174&r1=1404173&r2=1404174&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java 
Wed Oct 31 14:39:22 2012
@@ -225,11 +225,11 @@ public class Enricher extends ServiceSup
     }
 
     protected void doStart() throws Exception {
-        ServiceHelper.startService(producer);
+        ServiceHelper.startServices(aggregationStrategy, producer);
     }
 
     protected void doStop() throws Exception {
-        ServiceHelper.stopService(producer);
+        ServiceHelper.stopServices(producer, aggregationStrategy);
     }
 
     private static class CopyAggregationStrategy implements 
AggregationStrategy {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1404174&r1=1404173&r2=1404174&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 Wed Oct 31 14:39:22 2012
@@ -932,7 +932,7 @@ public class MulticastProcessor extends 
             String name = getClass().getSimpleName() + "-AggregateTask";
             aggregateExecutorService = createAggregateExecutorService(name);
         }
-        ServiceHelper.startServices(processors);
+        ServiceHelper.startServices(aggregationStrategy, processors);
     }
 
     /**
@@ -949,12 +949,12 @@ public class MulticastProcessor extends 
 
     @Override
     protected void doStop() throws Exception {
-        ServiceHelper.stopServices(processors, errorHandlers);
+        ServiceHelper.stopServices(processors, errorHandlers, 
aggregationStrategy);
     }
 
     @Override
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownServices(processors, errorHandlers);
+        ServiceHelper.stopAndShutdownServices(processors, errorHandlers, 
aggregationStrategy);
         // only clear error handlers when shutting down
         errorHandlers.clear();
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java?rev=1404174&r1=1404173&r2=1404174&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
 Wed Oct 31 14:39:22 2012
@@ -194,11 +194,11 @@ public class PollEnricher extends Servic
     }
 
     protected void doStart() throws Exception {
-        ServiceHelper.startService(consumer);
+        ServiceHelper.startServices(aggregationStrategy, consumer);
     }
 
     protected void doStop() throws Exception {
-        ServiceHelper.stopService(consumer);
+        ServiceHelper.stopServices(consumer, aggregationStrategy);
     }
 
     private static class CopyAggregationStrategy implements 
AggregationStrategy {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=1404174&r1=1404173&r2=1404174&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
 Wed Oct 31 14:39:22 2012
@@ -163,15 +163,15 @@ public class RecipientList extends Servi
         if (producerCache == null) {
             producerCache = new ProducerCache(this, camelContext);
         }
-        ServiceHelper.startService(producerCache);
+        ServiceHelper.startServices(aggregationStrategy, producerCache);
     }
 
     protected void doStop() throws Exception {
-        ServiceHelper.stopService(producerCache);
+        ServiceHelper.stopServices(producerCache, aggregationStrategy);
     }
 
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownService(producerCache);
+        ServiceHelper.stopAndShutdownServices(producerCache, 
aggregationStrategy);
 
         if (shutdownExecutorService && executorService != null) {
             
camelContext.getExecutorServiceManager().shutdownNow(executorService);

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1404174&r1=1404173&r2=1404174&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 Wed Oct 31 14:39:22 2012
@@ -849,7 +849,7 @@ public class AggregateProcessor extends 
             }
         }
 
-        ServiceHelper.startServices(processor, aggregationRepository);
+        ServiceHelper.startServices(aggregationStrategy, processor, 
aggregationRepository);
 
         // should we use recover checker
         if (aggregationRepository instanceof RecoverableAggregationRepository) 
{
@@ -966,8 +966,8 @@ public class AggregateProcessor extends 
 
     @Override
     protected void doShutdown() throws Exception {
-        // shutdown aggregation repository
-        ServiceHelper.stopService(aggregationRepository);
+        // shutdown aggregation repository and the strategy
+        ServiceHelper.stopAndShutdownServices(aggregationRepository, 
aggregationStrategy);
 
         // cleanup when shutting down
         inProgressCompleteExchanges.clear();

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java?rev=1404174&r1=1404173&r2=1404174&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
 Wed Oct 31 14:39:22 2012
@@ -35,6 +35,10 @@ import org.apache.camel.Exchange;
  * Possible implementations include performing some kind of combining or delta 
processing, such as adding line items
  * together into an invoice or just using the newest exchange and removing old 
exchanges such as for state tracking or
  * market data prices; where old values are of little use.
+ * <p/>
+ * If an implementation also implements {@link org.apache.camel.Service} then 
any <a href="http://camel.apache.org/eip";>EIP</a>
+ * that allowing configuring a {@link AggregationStrategy} will invoke the 
{@link org.apache.camel.Service#start()}
+ * and {@link org.apache.camel.Service#stop()} to control the lifecycle 
aligned with the EIP itself.
  * 
  * @version 
  */

Copied: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyServiceTest.java
 (from r1404122, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyServiceTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyServiceTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java&r1=1404122&r2=1404174&rev=1404174&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyServiceTest.java
 Wed Oct 31 14:39:22 2012
@@ -16,21 +16,25 @@
  */
 package org.apache.camel.processor.aggregator;
 
-import java.util.List;
-
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.processor.aggregate.AbstractListAggregationStrategy;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.support.ServiceSupport;
 
 /**
  *
  */
-public class CustomListAggregationStrategyTest extends ContextTestSupport {
+public class CustomAggregationStrategyServiceTest extends ContextTestSupport {
+
+    private MyCustomStrategy strategy = new MyCustomStrategy();
 
     @SuppressWarnings("unchecked")
     public void testCustomAggregationStrategy() throws Exception {
+        assertTrue("Should be started", strategy.start);
+        assertFalse("Should not be stopped", strategy.stop);
+
         MockEndpoint result = getMockEndpoint("mock:result");
         result.expectedMessageCount(1);
 
@@ -40,12 +44,11 @@ public class CustomListAggregationStrate
 
         assertMockEndpointsSatisfied();
 
-        // the list will be stored as the message body by default
-        List<Integer> numbers = 
result.getExchanges().get(0).getIn().getBody(List.class);
-        assertNotNull(numbers);
-        assertEquals(Integer.valueOf("100"), numbers.get(0));
-        assertEquals(Integer.valueOf("150"), numbers.get(1));
-        assertEquals(Integer.valueOf("130"), numbers.get(2));
+        // stop Camel
+        context.stop();
+
+        assertFalse("Should not be started", strategy.start);
+        assertTrue("Should be stopped", strategy.stop);
     }
 
     @Override
@@ -54,25 +57,34 @@ public class CustomListAggregationStrate
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                    .aggregate(new MyListOfNumbersStrategy()).header("id")
+                    .aggregate(strategy).header("id")
                     .completionSize(3)
                     .to("mock:result");
             }
         };
     }
 
-    // START SNIPPET: e1
-    /**
-     * Our strategy just group a list of integers.
-     */
-    public final class MyListOfNumbersStrategy extends 
AbstractListAggregationStrategy<Integer> {
+    public final class MyCustomStrategy extends ServiceSupport implements 
AggregationStrategy {
+
+        public boolean stop;
+        public boolean start;
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            return newExchange;
+        }
+
+        @Override
+        protected void doStart() throws Exception {
+            start = true;
+            stop = false;
+        }
 
         @Override
-        public Integer getValue(Exchange exchange) {
-            // the message body contains a number, so just return that as-is
-            return exchange.getIn().getBody(Integer.class);
+        protected void doStop() throws Exception {
+            stop = true;
+            start = false;
         }
     }
-    // END SNIPPET: e1
 
 }


Reply via email to