Author: davsclaus
Date: Wed Oct 31 14:39:22 2012
New Revision: 1404174
URL: http://svn.apache.org/viewvc?rev=1404174&view=rev
Log:
CAMEL-5760: EIPs with AggregationStrategy now control lifecycle on strategy to
allow end user to do custom logic in start/stop
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyServiceTest.java
- copied, changed from r1404122,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java?rev=1404174&r1=1404173&r2=1404174&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
Wed Oct 31 14:39:22 2012
@@ -225,11 +225,11 @@ public class Enricher extends ServiceSup
}
protected void doStart() throws Exception {
- ServiceHelper.startService(producer);
+ ServiceHelper.startServices(aggregationStrategy, producer);
}
protected void doStop() throws Exception {
- ServiceHelper.stopService(producer);
+ ServiceHelper.stopServices(producer, aggregationStrategy);
}
private static class CopyAggregationStrategy implements
AggregationStrategy {
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1404174&r1=1404173&r2=1404174&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Wed Oct 31 14:39:22 2012
@@ -932,7 +932,7 @@ public class MulticastProcessor extends
String name = getClass().getSimpleName() + "-AggregateTask";
aggregateExecutorService = createAggregateExecutorService(name);
}
- ServiceHelper.startServices(processors);
+ ServiceHelper.startServices(aggregationStrategy, processors);
}
/**
@@ -949,12 +949,12 @@ public class MulticastProcessor extends
@Override
protected void doStop() throws Exception {
- ServiceHelper.stopServices(processors, errorHandlers);
+ ServiceHelper.stopServices(processors, errorHandlers,
aggregationStrategy);
}
@Override
protected void doShutdown() throws Exception {
- ServiceHelper.stopAndShutdownServices(processors, errorHandlers);
+ ServiceHelper.stopAndShutdownServices(processors, errorHandlers,
aggregationStrategy);
// only clear error handlers when shutting down
errorHandlers.clear();
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java?rev=1404174&r1=1404173&r2=1404174&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
Wed Oct 31 14:39:22 2012
@@ -194,11 +194,11 @@ public class PollEnricher extends Servic
}
protected void doStart() throws Exception {
- ServiceHelper.startService(consumer);
+ ServiceHelper.startServices(aggregationStrategy, consumer);
}
protected void doStop() throws Exception {
- ServiceHelper.stopService(consumer);
+ ServiceHelper.stopServices(consumer, aggregationStrategy);
}
private static class CopyAggregationStrategy implements
AggregationStrategy {
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=1404174&r1=1404173&r2=1404174&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
Wed Oct 31 14:39:22 2012
@@ -163,15 +163,15 @@ public class RecipientList extends Servi
if (producerCache == null) {
producerCache = new ProducerCache(this, camelContext);
}
- ServiceHelper.startService(producerCache);
+ ServiceHelper.startServices(aggregationStrategy, producerCache);
}
protected void doStop() throws Exception {
- ServiceHelper.stopService(producerCache);
+ ServiceHelper.stopServices(producerCache, aggregationStrategy);
}
protected void doShutdown() throws Exception {
- ServiceHelper.stopAndShutdownService(producerCache);
+ ServiceHelper.stopAndShutdownServices(producerCache,
aggregationStrategy);
if (shutdownExecutorService && executorService != null) {
camelContext.getExecutorServiceManager().shutdownNow(executorService);
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1404174&r1=1404173&r2=1404174&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
Wed Oct 31 14:39:22 2012
@@ -849,7 +849,7 @@ public class AggregateProcessor extends
}
}
- ServiceHelper.startServices(processor, aggregationRepository);
+ ServiceHelper.startServices(aggregationStrategy, processor,
aggregationRepository);
// should we use recover checker
if (aggregationRepository instanceof RecoverableAggregationRepository)
{
@@ -966,8 +966,8 @@ public class AggregateProcessor extends
@Override
protected void doShutdown() throws Exception {
- // shutdown aggregation repository
- ServiceHelper.stopService(aggregationRepository);
+ // shutdown aggregation repository and the strategy
+ ServiceHelper.stopAndShutdownServices(aggregationRepository,
aggregationStrategy);
// cleanup when shutting down
inProgressCompleteExchanges.clear();
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java?rev=1404174&r1=1404173&r2=1404174&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
Wed Oct 31 14:39:22 2012
@@ -35,6 +35,10 @@ import org.apache.camel.Exchange;
* Possible implementations include performing some kind of combining or delta
processing, such as adding line items
* together into an invoice or just using the newest exchange and removing old
exchanges such as for state tracking or
* market data prices; where old values are of little use.
+ * <p/>
+ * If an implementation also implements {@link org.apache.camel.Service} then
any <a href="http://camel.apache.org/eip">EIP</a>
+ * that allowing configuring a {@link AggregationStrategy} will invoke the
{@link org.apache.camel.Service#start()}
+ * and {@link org.apache.camel.Service#stop()} to control the lifecycle
aligned with the EIP itself.
*
* @version
*/
Copied:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyServiceTest.java
(from r1404122,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyServiceTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyServiceTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java&r1=1404122&r2=1404174&rev=1404174&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyServiceTest.java
Wed Oct 31 14:39:22 2012
@@ -16,21 +16,25 @@
*/
package org.apache.camel.processor.aggregator;
-import java.util.List;
-
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.processor.aggregate.AbstractListAggregationStrategy;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.support.ServiceSupport;
/**
*
*/
-public class CustomListAggregationStrategyTest extends ContextTestSupport {
+public class CustomAggregationStrategyServiceTest extends ContextTestSupport {
+
+ private MyCustomStrategy strategy = new MyCustomStrategy();
@SuppressWarnings("unchecked")
public void testCustomAggregationStrategy() throws Exception {
+ assertTrue("Should be started", strategy.start);
+ assertFalse("Should not be stopped", strategy.stop);
+
MockEndpoint result = getMockEndpoint("mock:result");
result.expectedMessageCount(1);
@@ -40,12 +44,11 @@ public class CustomListAggregationStrate
assertMockEndpointsSatisfied();
- // the list will be stored as the message body by default
- List<Integer> numbers =
result.getExchanges().get(0).getIn().getBody(List.class);
- assertNotNull(numbers);
- assertEquals(Integer.valueOf("100"), numbers.get(0));
- assertEquals(Integer.valueOf("150"), numbers.get(1));
- assertEquals(Integer.valueOf("130"), numbers.get(2));
+ // stop Camel
+ context.stop();
+
+ assertFalse("Should not be started", strategy.start);
+ assertTrue("Should be stopped", strategy.stop);
}
@Override
@@ -54,25 +57,34 @@ public class CustomListAggregationStrate
@Override
public void configure() throws Exception {
from("direct:start")
- .aggregate(new MyListOfNumbersStrategy()).header("id")
+ .aggregate(strategy).header("id")
.completionSize(3)
.to("mock:result");
}
};
}
- // START SNIPPET: e1
- /**
- * Our strategy just group a list of integers.
- */
- public final class MyListOfNumbersStrategy extends
AbstractListAggregationStrategy<Integer> {
+ public final class MyCustomStrategy extends ServiceSupport implements
AggregationStrategy {
+
+ public boolean stop;
+ public boolean start;
+
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ return newExchange;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ start = true;
+ stop = false;
+ }
@Override
- public Integer getValue(Exchange exchange) {
- // the message body contains a number, so just return that as-is
- return exchange.getIn().getBody(Integer.class);
+ protected void doStop() throws Exception {
+ stop = true;
+ start = false;
}
}
- // END SNIPPET: e1
}