Hi all:
First, Camel is a very interesting project. Congrats to the team!!
Now the question...
I´ve seen a post about better support for Aggregator pattern but with no
answer. The testcases seem to be very simple and I think this is a really
relevant pattern for distributed processing. But, being based in "a priori"
batch size and timeout it lacks value for most of the uses I can think of.
Any plans for an Aggregator face-lift? :)
I attach an imaginative testcase :O) with some comments on FIXMEs
Lot of thanks, Alberto Mijares
// ------------ START ---------------//
package org.fundacionctic.taw;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.camel.CamelTemplate;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.model.AggregatorType;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class AggregatorTest extends ContextTestSupport {
private Log log = LogFactory.getLog(this.getClass());
private static final String SURNAME_HEADER = "surname";
private static final String TYPE_HEADER = "type";
private static final String BROTHERS_TYPE = "brothers";
public void testAggregator() throws Exception {
String allNames = "Harpo Marx,Fiodor Karamazov,Chico Marx,Ivan
Karamazov,Groucho Marx,Alexei Karamazov,Dimitri Karamazov";
List<String> marxBrothers = new ArrayList<String>();
marxBrothers.add("Harpo");
marxBrothers.add("Chico");
marxBrothers.add("Groucho");
List<String> karamazovBrothers = new ArrayList<String>();
karamazovBrothers.add("Fiodor");
karamazovBrothers.add("Ivan");
karamazovBrothers.add("Alexei");
karamazovBrothers.add("Dimitri");
Map<String, List> allBrothers = new HashMap<String, List>();
allBrothers.put("Marx", marxBrothers);
allBrothers.put("Karamazov", karamazovBrothers);
MockEndpoint resultEndpoint =
resolveMandatoryEndpoint("mock:result",
MockEndpoint.class);
resultEndpoint.expectedMessageCount(1);
resultEndpoint.expectedBodiesReceived(allBrothers);
CamelTemplate template = new CamelTemplate(context);
template.sendBody("direct:start", allNames);
resultEndpoint.assertIsSatisfied();
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
private void debugIn(String stringId, Exchange
oldExchange,
Exchange newExchange) {
log.debug(stringId + " old headers in: "
+
oldExchange.getIn().getHeaders());
log.debug(stringId + " old body in: "
+
oldExchange.getIn().getBody());
log.debug(stringId + " new headers in: "
+
newExchange.getIn().getHeaders());
log.debug(stringId + " new body in: "
+
newExchange.getIn().getBody());
}
private void debugOut(String stringId, Exchange
exchange) {
log.debug(stringId + " old headers out: "
+
exchange.getIn().getHeaders());
log.debug(stringId + " old body out: "
+ exchange.getIn().getBody());
}
AggregationStrategy surnameAggregator = new
AggregationStrategy() {
public Exchange aggregate(Exchange oldExchange,
Exchange newExchange) {
debugIn("Surname Aggregator",
oldExchange, newExchange);
Message oldIn = oldExchange.getIn();
Message newIn = newExchange.getIn();
List<String> brothers = null;
if (oldIn.getBody() instanceof List) {
brothers =
oldIn.getBody(List.class);
brothers.add(newIn.getBody(String.class));
} else {
brothers = new
ArrayList<String>();
brothers.add(oldIn.getBody(String.class));
brothers.add(newIn.getBody(String.class));
oldExchange.getIn().setBody(brothers);
} // else
debugOut("Surname Aggregator",
oldExchange);
return oldExchange;
}
};
AggregationStrategy brothersAggregator = new
AggregationStrategy() {
public Exchange aggregate(Exchange oldExchange,
Exchange newExchange) {
debugIn("Brothers Aggregator",
oldExchange, newExchange);
Message oldIn = oldExchange.getIn();
Message newIn = newExchange.getIn();
Map<String, List> brothers = null;
if (oldIn.getBody() instanceof Map) {
brothers =
oldIn.getBody(Map.class);
brothers.put(newIn.getHeader(SURNAME_HEADER,
String.class),
newIn.getBody(List.class));
} else {
brothers = new HashMap<String,
List>();
brothers.put(oldIn.getHeader(SURNAME_HEADER, String.class),
oldIn.getBody(List.class));
brothers.put(newIn.getHeader(SURNAME_HEADER,
String.class),
newIn.getBody(List.class));
oldExchange.getIn().setBody(brothers);
} // else
debugOut("Brothers Aggregator",
oldExchange);
return oldExchange;
}
};
@Override
public void configure() throws Exception {
from("direct:start")
// Separate people
.splitter(bodyAs(String.class).tokenize(",")).process(
// Split the name, erase the
surname and put it in a
// header
new Processor()
{
public
void process(Exchange exchange)
throws Exception {
String[] parts = exchange.getIn()
.getBody(String.class).split(
" ");
exchange.getIn().setBody(parts[0]);
exchange.getIn().setHeader(
SURNAME_HEADER, parts[1]);
} //
process
}) // Processor
.to("direct:joinSurnames");
// FIXME: This aggregator doesn´t usually fail
but could also due to
timeout
// or an incorrect batch size
// Join in a list by the surname on the header
and mark as
// brothers list
from("direct:joinSurnames")
.aggregator(header(SURNAME_HEADER),
surnameAggregator).setHeader(TYPE_HEADER,
constant(BROTHERS_TYPE)).to("direct:joinBrothers");
// Join all brothers lists and remove surname
and type headers
AggregatorType agg =
from("direct:joinBrothers").aggregator(header(TYPE_HEADER),
brothersAggregator);
// FIXME: If these lines get commented the test
fails some times with
different errors
// due to a timeout or incorrect batch size
that must be adjusted by
hand
// There are two brothers lists to join but we
don´t know always the
number "a priori"
agg.setBatchSize(2);
agg.setBatchTimeout(10000);
agg.removeHeader(SURNAME_HEADER)
.removeHeader(TYPE_HEADER)
.to("mock:result");
}
};
}
}
// ------------ END ---------------//
--
View this message in context:
http://www.nabble.com/Aggregator-strategies-%28again%29-tf4750834s22882.html#a13584751
Sent from the Camel - Users mailing list archive at Nabble.com.