Hello!

I was thinking about this problem a little, and I would like to ask
someone more fluent in camel about his/her thoughts about the thing.

I thought about creating common base component based on something like
DelayedQueue class where whenever you receive an exchange you will
execute strategy class that is responsible for creating
DelayedExchange class (wrapper for Exchange that implements Delayed)
and putting it to DelayedQueue.

Then you have one thread that just monitors this DelayedQueue and
sends any exchange that is retrieved from this queue (very similar to
StreamResequencer).

This way we can create Delayer that will not block current thread.

On the other hand we can create this strategy in a way that on every
new exchange it will remove pending exchange from DelayedQueue, modify
it (using some AggregatorStrategy) and put it once again to this
Queue. If it will notice that aggregation is complete it will add
aggregated Exchange to this queue with delay == 0.

Logic to specify correlationId, default delay or batch size can be
exposed as an Expression.

I've already started to code it this way so if you have any comments
for this then let me know.
I hope that if it will be finished it could be included in camel distribution.

Cheers
Roman
2007/11/5, almilo <[EMAIL PROTECTED]>:
>
> Hi all:
>
> First, Camel is a very interesting project. Congrats to the team!!
>
> Now the question...
>
> I´ve seen a post about better support for Aggregator pattern but with no
> answer. The testcases seem to be very simple and I think this is a really
> relevant pattern for distributed processing. But, being based in "a priori"
> batch size and timeout it lacks value for most of the uses I can think of.
>
> Any plans for an Aggregator face-lift? :)
>
> I attach an imaginative testcase :O) with some comments on FIXMEs
>
> Lot of thanks, Alberto Mijares
>
> // ------------ START ---------------//
> package org.fundacionctic.taw;
>
> import java.util.ArrayList;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
>
> import org.apache.camel.CamelTemplate;
> import org.apache.camel.ContextTestSupport;
> import org.apache.camel.Exchange;
> import org.apache.camel.Message;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.model.AggregatorType;
> import org.apache.camel.processor.aggregate.AggregationStrategy;
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
>
> public class AggregatorTest extends ContextTestSupport {
>
>         private Log log = LogFactory.getLog(this.getClass());
>
>         private static final String SURNAME_HEADER = "surname";
>
>         private static final String TYPE_HEADER = "type";
>
>         private static final String BROTHERS_TYPE = "brothers";
>
>         public void testAggregator() throws Exception {
>
>                 String allNames = "Harpo Marx,Fiodor Karamazov,Chico Marx,Ivan
> Karamazov,Groucho Marx,Alexei Karamazov,Dimitri Karamazov";
>
>                 List<String> marxBrothers = new ArrayList<String>();
>                 marxBrothers.add("Harpo");
>                 marxBrothers.add("Chico");
>                 marxBrothers.add("Groucho");
>
>                 List<String> karamazovBrothers = new ArrayList<String>();
>                 karamazovBrothers.add("Fiodor");
>                 karamazovBrothers.add("Ivan");
>                 karamazovBrothers.add("Alexei");
>                 karamazovBrothers.add("Dimitri");
>
>                 Map<String, List> allBrothers = new HashMap<String, List>();
>                 allBrothers.put("Marx", marxBrothers);
>                 allBrothers.put("Karamazov", karamazovBrothers);
>
>                 MockEndpoint resultEndpoint = 
> resolveMandatoryEndpoint("mock:result",
>                                 MockEndpoint.class);
>                 resultEndpoint.expectedMessageCount(1);
>                 resultEndpoint.expectedBodiesReceived(allBrothers);
>
>                 CamelTemplate template = new CamelTemplate(context);
>                 template.sendBody("direct:start", allNames);
>
>                 resultEndpoint.assertIsSatisfied();
>
>         }
>
>         @Override
>         protected RouteBuilder createRouteBuilder() throws Exception {
>
>                 return new RouteBuilder() {
>
>                         private void debugIn(String stringId, Exchange 
> oldExchange,
>                                         Exchange newExchange) {
>
>                                 log.debug(stringId + " old headers in: "
>                                                 + 
> oldExchange.getIn().getHeaders());
>                                 log.debug(stringId + " old body in: "
>                                                 + 
> oldExchange.getIn().getBody());
>                                 log.debug(stringId + " new headers in: "
>                                                 + 
> newExchange.getIn().getHeaders());
>                                 log.debug(stringId + " new body in: "
>                                                 + 
> newExchange.getIn().getBody());
>
>                         }
>
>                         private void debugOut(String stringId, Exchange 
> exchange) {
>
>                                 log.debug(stringId + " old headers out: "
>                                                 + 
> exchange.getIn().getHeaders());
>                                 log.debug(stringId + " old body out: "
>                                                 + exchange.getIn().getBody());
>
>                         }
>
>                         AggregationStrategy surnameAggregator = new 
> AggregationStrategy() {
>
>                                 public Exchange aggregate(Exchange 
> oldExchange,
>                                                 Exchange newExchange) {
>
>                                         debugIn("Surname Aggregator", 
> oldExchange, newExchange);
>
>                                         Message oldIn = oldExchange.getIn();
>                                         Message newIn = newExchange.getIn();
>
>                                         List<String> brothers = null;
>                                         if (oldIn.getBody() instanceof List) {
>
>                                                 brothers = 
> oldIn.getBody(List.class);
>                                                 
> brothers.add(newIn.getBody(String.class));
>
>                                         } else {
>
>                                                 brothers = new 
> ArrayList<String>();
>                                                 
> brothers.add(oldIn.getBody(String.class));
>                                                 
> brothers.add(newIn.getBody(String.class));
>                                                 
> oldExchange.getIn().setBody(brothers);
>
>                                         } // else
>
>                                         debugOut("Surname Aggregator", 
> oldExchange);
>
>                                         return oldExchange;
>
>                                 }
>
>                         };
>
>                         AggregationStrategy brothersAggregator = new 
> AggregationStrategy() {
>
>                                 public Exchange aggregate(Exchange 
> oldExchange,
>                                                 Exchange newExchange) {
>
>                                         debugIn("Brothers Aggregator", 
> oldExchange, newExchange);
>
>                                         Message oldIn = oldExchange.getIn();
>                                         Message newIn = newExchange.getIn();
>
>                                         Map<String, List> brothers = null;
>                                         if (oldIn.getBody() instanceof Map) {
>
>                                                 brothers = 
> oldIn.getBody(Map.class);
>                                                 
> brothers.put(newIn.getHeader(SURNAME_HEADER,
>                                                                 
> String.class), newIn.getBody(List.class));
>
>                                         } else {
>
>                                                 brothers = new 
> HashMap<String, List>();
>                                                 
> brothers.put(oldIn.getHeader(SURNAME_HEADER, String.class),
>                                                                 
> oldIn.getBody(List.class));
>                                                 
> brothers.put(newIn.getHeader(SURNAME_HEADER,
>                                                                 
> String.class), newIn.getBody(List.class));
>                                                 
> oldExchange.getIn().setBody(brothers);
>
>                                         } // else
>
>                                         debugOut("Brothers Aggregator", 
> oldExchange);
>
>                                         return oldExchange;
>
>                                 }
>
>                         };
>
>                         @Override
>                         public void configure() throws Exception {
>
>                                 from("direct:start")
>                                                 // Separate people
>                                                 
> .splitter(bodyAs(String.class).tokenize(",")).process(
>
>                                                 // Split the name, erase the 
> surname and put it in a
>                                                 // header
>                                                                 new 
> Processor() {
>
>                                                                         
> public void process(Exchange exchange)
>                                                                               
>           throws Exception {
>
>                                                                               
>   String[] parts = exchange.getIn()
>                                                                               
>                   .getBody(String.class).split(
>                                                                               
>                                   " ");
>                                                                               
>   exchange.getIn().setBody(parts[0]);
>                                                                               
>   exchange.getIn().setHeader(
>                                                                               
>                   SURNAME_HEADER, parts[1]);
>
>                                                                         } // 
> process
>
>                                                                 }) // 
> Processor
>
>                                                 .to("direct:joinSurnames");
>
>                                 // FIXME: This aggregator doesn´t usually 
> fail but could also due to
> timeout
>                                 // or an incorrect batch size
>                                 // Join in a list by the surname on the 
> header and mark as
>                                 // brothers list
>                                 from("direct:joinSurnames")
>                                 .aggregator(header(SURNAME_HEADER),
>                                                 
> surnameAggregator).setHeader(TYPE_HEADER,
>                                                 
> constant(BROTHERS_TYPE)).to("direct:joinBrothers");
>
>                                 // Join all brothers lists and remove surname 
> and type headers
>                                 AggregatorType agg =
> from("direct:joinBrothers").aggregator(header(TYPE_HEADER),
>                                                 brothersAggregator);
>
>                                 // FIXME: If these lines get commented the 
> test fails some times with
> different errors
>                                 // due to a timeout or incorrect batch size 
> that must be adjusted by
> hand
>                                 // There are two brothers lists to join but 
> we don´t know always the
> number "a priori"
>                                 agg.setBatchSize(2);
>                                 agg.setBatchTimeout(10000);
>                                 agg.removeHeader(SURNAME_HEADER)
>                                 .removeHeader(TYPE_HEADER)
>                                 .to("mock:result");
>
>                         }
>
>                 };
>
>         }
>
> }
> // ------------ END ---------------//
>
> --
> View this message in context: 
> http://www.nabble.com/Aggregator-strategies-%28again%29-tf4750834s22882.html#a13584751
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>

Reply via email to