BTW Alberto I've added your test case to the distro so folks can
noodle on it and see if we can improve things some. See
AlbertoAggregatorTest in the camel-core module.
I commented out the use of the set batch size; as we don't really need
to worry about that in this case, as we can just assume if the timeout
fires after a few seconds we've got to the end of any possible batch.
The tricky thing is knowing when you're at the end of the batch
really. One simple solution would be to add some kinda predicate to
detect batch-completion. For example when we split messages we could
record how many split messages there are and each messages' counter so
that we know when we've aggregated them all together again?
On 05/11/2007, almilo <[EMAIL PROTECTED]> wrote:
>
> Hi all:
>
> First, Camel is a very interesting project. Congrats to the team!!
>
> Now the question...
>
> I´ve seen a post about better support for Aggregator pattern but with no
> answer. The testcases seem to be very simple and I think this is a really
> relevant pattern for distributed processing. But, being based in "a priori"
> batch size and timeout it lacks value for most of the uses I can think of.
>
> Any plans for an Aggregator face-lift? :)
>
> I attach an imaginative testcase :O) with some comments on FIXMEs
>
> Lot of thanks, Alberto Mijares
>
> // ------------ START ---------------//
> package org.fundacionctic.taw;
>
> import java.util.ArrayList;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
>
> import org.apache.camel.CamelTemplate;
> import org.apache.camel.ContextTestSupport;
> import org.apache.camel.Exchange;
> import org.apache.camel.Message;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.model.AggregatorType;
> import org.apache.camel.processor.aggregate.AggregationStrategy;
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
>
> public class AggregatorTest extends ContextTestSupport {
>
> private Log log = LogFactory.getLog(this.getClass());
>
> private static final String SURNAME_HEADER = "surname";
>
> private static final String TYPE_HEADER = "type";
>
> private static final String BROTHERS_TYPE = "brothers";
>
> public void testAggregator() throws Exception {
>
> String allNames = "Harpo Marx,Fiodor Karamazov,Chico Marx,Ivan
> Karamazov,Groucho Marx,Alexei Karamazov,Dimitri Karamazov";
>
> List<String> marxBrothers = new ArrayList<String>();
> marxBrothers.add("Harpo");
> marxBrothers.add("Chico");
> marxBrothers.add("Groucho");
>
> List<String> karamazovBrothers = new ArrayList<String>();
> karamazovBrothers.add("Fiodor");
> karamazovBrothers.add("Ivan");
> karamazovBrothers.add("Alexei");
> karamazovBrothers.add("Dimitri");
>
> Map<String, List> allBrothers = new HashMap<String, List>();
> allBrothers.put("Marx", marxBrothers);
> allBrothers.put("Karamazov", karamazovBrothers);
>
> MockEndpoint resultEndpoint =
> resolveMandatoryEndpoint("mock:result",
> MockEndpoint.class);
> resultEndpoint.expectedMessageCount(1);
> resultEndpoint.expectedBodiesReceived(allBrothers);
>
> CamelTemplate template = new CamelTemplate(context);
> template.sendBody("direct:start", allNames);
>
> resultEndpoint.assertIsSatisfied();
>
> }
>
> @Override
> protected RouteBuilder createRouteBuilder() throws Exception {
>
> return new RouteBuilder() {
>
> private void debugIn(String stringId, Exchange
> oldExchange,
> Exchange newExchange) {
>
> log.debug(stringId + " old headers in: "
> +
> oldExchange.getIn().getHeaders());
> log.debug(stringId + " old body in: "
> +
> oldExchange.getIn().getBody());
> log.debug(stringId + " new headers in: "
> +
> newExchange.getIn().getHeaders());
> log.debug(stringId + " new body in: "
> +
> newExchange.getIn().getBody());
>
> }
>
> private void debugOut(String stringId, Exchange
> exchange) {
>
> log.debug(stringId + " old headers out: "
> +
> exchange.getIn().getHeaders());
> log.debug(stringId + " old body out: "
> + exchange.getIn().getBody());
>
> }
>
> AggregationStrategy surnameAggregator = new
> AggregationStrategy() {
>
> public Exchange aggregate(Exchange
> oldExchange,
> Exchange newExchange) {
>
> debugIn("Surname Aggregator",
> oldExchange, newExchange);
>
> Message oldIn = oldExchange.getIn();
> Message newIn = newExchange.getIn();
>
> List<String> brothers = null;
> if (oldIn.getBody() instanceof List) {
>
> brothers =
> oldIn.getBody(List.class);
>
> brothers.add(newIn.getBody(String.class));
>
> } else {
>
> brothers = new
> ArrayList<String>();
>
> brothers.add(oldIn.getBody(String.class));
>
> brothers.add(newIn.getBody(String.class));
>
> oldExchange.getIn().setBody(brothers);
>
> } // else
>
> debugOut("Surname Aggregator",
> oldExchange);
>
> return oldExchange;
>
> }
>
> };
>
> AggregationStrategy brothersAggregator = new
> AggregationStrategy() {
>
> public Exchange aggregate(Exchange
> oldExchange,
> Exchange newExchange) {
>
> debugIn("Brothers Aggregator",
> oldExchange, newExchange);
>
> Message oldIn = oldExchange.getIn();
> Message newIn = newExchange.getIn();
>
> Map<String, List> brothers = null;
> if (oldIn.getBody() instanceof Map) {
>
> brothers =
> oldIn.getBody(Map.class);
>
> brothers.put(newIn.getHeader(SURNAME_HEADER,
>
> String.class), newIn.getBody(List.class));
>
> } else {
>
> brothers = new
> HashMap<String, List>();
>
> brothers.put(oldIn.getHeader(SURNAME_HEADER, String.class),
>
> oldIn.getBody(List.class));
>
> brothers.put(newIn.getHeader(SURNAME_HEADER,
>
> String.class), newIn.getBody(List.class));
>
> oldExchange.getIn().setBody(brothers);
>
> } // else
>
> debugOut("Brothers Aggregator",
> oldExchange);
>
> return oldExchange;
>
> }
>
> };
>
> @Override
> public void configure() throws Exception {
>
> from("direct:start")
> // Separate people
>
> .splitter(bodyAs(String.class).tokenize(",")).process(
>
> // Split the name, erase the
> surname and put it in a
> // header
> new
> Processor() {
>
>
> public void process(Exchange exchange)
>
> throws Exception {
>
>
> String[] parts = exchange.getIn()
>
> .getBody(String.class).split(
>
> " ");
>
> exchange.getIn().setBody(parts[0]);
>
> exchange.getIn().setHeader(
>
> SURNAME_HEADER, parts[1]);
>
> } //
> process
>
> }) //
> Processor
>
> .to("direct:joinSurnames");
>
> // FIXME: This aggregator doesn´t usually
> fail but could also due to
> timeout
> // or an incorrect batch size
> // Join in a list by the surname on the
> header and mark as
> // brothers list
> from("direct:joinSurnames")
> .aggregator(header(SURNAME_HEADER),
>
> surnameAggregator).setHeader(TYPE_HEADER,
>
> constant(BROTHERS_TYPE)).to("direct:joinBrothers");
>
> // Join all brothers lists and remove surname
> and type headers
> AggregatorType agg =
> from("direct:joinBrothers").aggregator(header(TYPE_HEADER),
> brothersAggregator);
>
> // FIXME: If these lines get commented the
> test fails some times with
> different errors
> // due to a timeout or incorrect batch size
> that must be adjusted by
> hand
> // There are two brothers lists to join but
> we don´t know always the
> number "a priori"
> agg.setBatchSize(2);
> agg.setBatchTimeout(10000);
> agg.removeHeader(SURNAME_HEADER)
> .removeHeader(TYPE_HEADER)
> .to("mock:result");
>
> }
>
> };
>
> }
>
> }
> // ------------ END ---------------//
>
>
> --
> View this message in context:
> http://www.nabble.com/Aggregator-strategies-%28again%29-tf4750834s22882.html#a13584751
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>
--
James
-------
http://macstrac.blogspot.com/
Open Source SOA
http://open.iona.com