Hi all:

First, Camel is a very interesting project. Congrats to the team!!

Now the question...

I´ve seen a post about better support for Aggregator pattern but with no
answer. The testcases seem to be very simple and I think this is a really
relevant pattern for distributed processing. But, being based in "a priori"
batch size and timeout it lacks value for most of the uses I can think of.

Any plans for an Aggregator face-lift? :)

I attach an imaginative testcase :O) with some comments on FIXMEs

Lot of thanks, Alberto Mijares

// ------------ START ---------------//
package org.fundacionctic.taw;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.camel.CamelTemplate;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.model.AggregatorType;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class AggregatorTest extends ContextTestSupport {

        private Log log = LogFactory.getLog(this.getClass());

        private static final String SURNAME_HEADER = "surname";

        private static final String TYPE_HEADER = "type";

        private static final String BROTHERS_TYPE = "brothers";

        public void testAggregator() throws Exception {

                String allNames = "Harpo Marx,Fiodor Karamazov,Chico Marx,Ivan
Karamazov,Groucho Marx,Alexei Karamazov,Dimitri Karamazov";

                List<String> marxBrothers = new ArrayList<String>();
                marxBrothers.add("Harpo");
                marxBrothers.add("Chico");
                marxBrothers.add("Groucho");

                List<String> karamazovBrothers = new ArrayList<String>();
                karamazovBrothers.add("Fiodor");
                karamazovBrothers.add("Ivan");
                karamazovBrothers.add("Alexei");
                karamazovBrothers.add("Dimitri");

                Map<String, List> allBrothers = new HashMap<String, List>();
                allBrothers.put("Marx", marxBrothers);
                allBrothers.put("Karamazov", karamazovBrothers);

                MockEndpoint resultEndpoint = 
resolveMandatoryEndpoint("mock:result",
                                MockEndpoint.class);
                resultEndpoint.expectedMessageCount(1);
                resultEndpoint.expectedBodiesReceived(allBrothers);

                CamelTemplate template = new CamelTemplate(context);
                template.sendBody("direct:start", allNames);

                resultEndpoint.assertIsSatisfied();

        }

        @Override
        protected RouteBuilder createRouteBuilder() throws Exception {

                return new RouteBuilder() {

                        private void debugIn(String stringId, Exchange 
oldExchange,
                                        Exchange newExchange) {

                                log.debug(stringId + " old headers in: "
                                                + 
oldExchange.getIn().getHeaders());
                                log.debug(stringId + " old body in: "
                                                + 
oldExchange.getIn().getBody());
                                log.debug(stringId + " new headers in: "
                                                + 
newExchange.getIn().getHeaders());
                                log.debug(stringId + " new body in: "
                                                + 
newExchange.getIn().getBody());

                        }

                        private void debugOut(String stringId, Exchange 
exchange) {

                                log.debug(stringId + " old headers out: "
                                                + 
exchange.getIn().getHeaders());
                                log.debug(stringId + " old body out: "
                                                + exchange.getIn().getBody());

                        }

                        AggregationStrategy surnameAggregator = new 
AggregationStrategy() {

                                public Exchange aggregate(Exchange oldExchange,
                                                Exchange newExchange) {

                                        debugIn("Surname Aggregator", 
oldExchange, newExchange);

                                        Message oldIn = oldExchange.getIn();
                                        Message newIn = newExchange.getIn();

                                        List<String> brothers = null;
                                        if (oldIn.getBody() instanceof List) {

                                                brothers = 
oldIn.getBody(List.class);
                                                
brothers.add(newIn.getBody(String.class));

                                        } else {

                                                brothers = new 
ArrayList<String>();
                                                
brothers.add(oldIn.getBody(String.class));
                                                
brothers.add(newIn.getBody(String.class));
                                                
oldExchange.getIn().setBody(brothers);

                                        } // else

                                        debugOut("Surname Aggregator", 
oldExchange);

                                        return oldExchange;

                                }

                        };

                        AggregationStrategy brothersAggregator = new 
AggregationStrategy() {

                                public Exchange aggregate(Exchange oldExchange,
                                                Exchange newExchange) {

                                        debugIn("Brothers Aggregator", 
oldExchange, newExchange);

                                        Message oldIn = oldExchange.getIn();
                                        Message newIn = newExchange.getIn();

                                        Map<String, List> brothers = null;
                                        if (oldIn.getBody() instanceof Map) {

                                                brothers = 
oldIn.getBody(Map.class);
                                                
brothers.put(newIn.getHeader(SURNAME_HEADER,
                                                                String.class), 
newIn.getBody(List.class));

                                        } else {

                                                brothers = new HashMap<String, 
List>();
                                                
brothers.put(oldIn.getHeader(SURNAME_HEADER, String.class),
                                                                
oldIn.getBody(List.class));
                                                
brothers.put(newIn.getHeader(SURNAME_HEADER,
                                                                String.class), 
newIn.getBody(List.class));
                                                
oldExchange.getIn().setBody(brothers);

                                        } // else

                                        debugOut("Brothers Aggregator", 
oldExchange);

                                        return oldExchange;

                                }

                        };

                        @Override
                        public void configure() throws Exception {

                                from("direct:start")
                                                // Separate people
                                                
.splitter(bodyAs(String.class).tokenize(",")).process(

                                                // Split the name, erase the 
surname and put it in a
                                                // header
                                                                new Processor() 
{

                                                                        public 
void process(Exchange exchange)
                                                                                
        throws Exception {

                                                                                
String[] parts = exchange.getIn()
                                                                                
                .getBody(String.class).split(
                                                                                
                                " ");
                                                                                
exchange.getIn().setBody(parts[0]);
                                                                                
exchange.getIn().setHeader(
                                                                                
                SURNAME_HEADER, parts[1]);

                                                                        } // 
process

                                                                }) // Processor

                                                .to("direct:joinSurnames");

                                // FIXME: This aggregator doesn´t usually fail 
but could also due to
timeout
                                // or an incorrect batch size
                                // Join in a list by the surname on the header 
and mark as
                                // brothers list
                                from("direct:joinSurnames")
                                .aggregator(header(SURNAME_HEADER),
                                                
surnameAggregator).setHeader(TYPE_HEADER,
                                                
constant(BROTHERS_TYPE)).to("direct:joinBrothers");

                                // Join all brothers lists and remove surname 
and type headers
                                AggregatorType agg =
from("direct:joinBrothers").aggregator(header(TYPE_HEADER),
                                                brothersAggregator);
                                
                                // FIXME: If these lines get commented the test 
fails some times with
different errors
                                // due to a timeout or incorrect batch size 
that must be adjusted by
hand
                                // There are two brothers lists to join but we 
don´t know always the
number "a priori"
                                agg.setBatchSize(2);
                                agg.setBatchTimeout(10000);
                                agg.removeHeader(SURNAME_HEADER)
                                .removeHeader(TYPE_HEADER)
                                .to("mock:result");

                        }

                };

        }

}
// ------------ END ---------------//

-- 
View this message in context: 
http://www.nabble.com/Aggregator-strategies-%28again%29-tf4750834s22882.html#a13584751
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to