Hi,
I enabled the Tracer and it routes as I would expect it, but the problem
remains that
List result =(List) template.sendBody("direct:start", ExchangePattern.InOut,
myQueryString);
only contains ResultObjectB instead of the aggregated List.
I've written a smaller Testcast with plain Java DSL, where i just process
Strings, and want to aggregate those and return the aggregated
exchange-body:
ROUTE:
public void testAggregation() throws Exception {
cc.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
MyAggregationStrategy ag = new
MyAggregationStrategy();
from("direct:start").setHeader("ID").constant(1).multicast().setParallelProcessing(true).to("direct:stage1",
"direct:stage2");
from("direct:stage1").process(new
LogProcessor("A")).to("direct:merge");
from("direct:stage2").process(new
LogProcessor("B")).to("direct:merge");
from("direct:merge").aggregator(header("ID"),ag).batchSize(2).
to("mock:end");
}
});
String res = (String) pt.requestBodyAndHeader("direct:start",
"test",
"ID", 1);
System.out.println("RESULT:\n"+ res);
assertEquals("A_test#B_test", res);
MockEndpoint mock = (MockEndpoint) cc.getEndpoint("mock:end");
mock.expectedMessageCount(1);
mock.assertIsSatisfied();
}
LogProcessor:
public class LogProcessor implements Processor {
private String name;
public LogProcessor(String name) {
this.name=name;
}
public void process(Exchange exchange) throws Exception {
exchange.getIn().setBody(name+"_"+exchange.getIn().getBody());
}
MyAggregationStrategy:
public class MyAggregationStrategy implements AggregationStrategy {
private Integer count = 0;
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
count = (Integer) oldExchange.getIn().getHeader("aggregated");
if(count==null)
count=1;
count++;
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newExchange.getIn().getBody(String.class);
Exchange copy = newExchange.copy();
copy.getIn().setBody(oldBody+"#"+newBody);
copy.getIn().setHeader("aggregated", count);
System.out.println(copy.getIn());
return copy;
}
public boolean isCompleted() {
if(count!=null)
System.out.println("Aggregated? -> "+(getCount()==2));
return getCount()==2;
}
public int getCount() {
return count;
}
}
here is the Trace-output
07.11.2008 16:55:05 org.apache.camel.impl.DefaultCamelContext <init>
INFO: JMX is disabled. Using DefaultLifecycleStrategy.
07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO:
<TRACER>
ID: ID-WUM128085/4570-1226073306211/0-0
Node: SetHeader[ID, 1]
Headers: {ID=1}
Body: test
Pattern: InOut
Time: 1226073306321
</TRACER>
07.11.2008 16:55:06 org.apache.camel.impl.converter.DefaultTypeConverter
addTypeConverter
WARNUNG: Overriding type converter from: StaticMethodTypeConverter: public
static java.lang.String
org.apache.camel.converter.IOConverter.toString(javax.xml.transform.Source)
throws javax.xml.transform.TransformerException,java.io.IOException to:
InstanceMethodTypeConverter: public java.lang.String
org.apache.camel.converter.jaxp.XmlConverter.toString(javax.xml.transform.Source)
throws javax.xml.transform.TransformerException
07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO:
<TRACER>
ID: ID-WUM128085/4570-1226073306211/0-1
Node: To[direct:stage1]
Headers: {ID=1}
Body: test
Pattern: InOut
Time: 1226073306508
</TRACER>
07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO:
<TRACER>
ID: ID-WUM128085/4570-1226073306211/0-1
Node: [EMAIL PROTECTED]
Headers: {ID=1}
Body: test
Pattern: InOut
Time: 1226073306508
</TRACER>
07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO:
<TRACER>
ID: ID-WUM128085/4570-1226073306211/0-1
Node: To[direct:merge]
Headers: {ID=1}
Body: A_test
Pattern: InOut
Time: 1226073306508
</TRACER>
07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO:
<TRACER>
ID: ID-WUM128085/4570-1226073306211/0-2
Node: To[direct:stage2]
Headers: {ID=1}
Body: test
Pattern: InOut
Time: 1226073306524
</TRACER>
07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO:
<TRACER>
ID: ID-WUM128085/4570-1226073306211/0-2
Node: [EMAIL PROTECTED]
Headers: {ID=1}
Body: test
Pattern: InOut
Time: 1226073306524
</TRACER>
07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO:
<TRACER>
ID: ID-WUM128085/4570-1226073306211/0-2
Node: To[direct:merge]
Headers: {ID=1}
Body: B_test
Pattern: InOut
Time: 1226073306524
</TRACER>
Message: A_test#B_test
07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO:
<TRACER>
ID: ID-WUM128085/4570-1226073306211/0-3
Node: To[mock:end]
Headers: {aggregated=2, ID=1}
Body: A_test#B_test
Pattern: InOut
Time: 1226073306524
</TRACER>
this test fails with
junit.framework.ComparisonFailure: result ok? expected:<[A_test#]B_test> but
was:<[]B_test>
although, mock:end receives the correct body.
What do I have to do to receive
A_test#B_test from my call:
String res = (String) pt.requestBodyAndHeader("direct:start", "test", "ID",
1); ??
is it actually possible with direct endpoints?
thx,
Rob
Claus Ibsen wrote:
>
> Hi
>
> You can use the tracer to enable tracing of the exchanges how they are
> routed in Camel.
> http://activemq.apache.org/camel/tracer.html
>
> And I suppose that you type wrong in this mail as you wrote from:a twice:
> from("direct:a").process(new myProcessor()).bean(beanB,
> "methodB").to("direct:merge");
>
> That is should be: from("direct:b")
>
>
>
> Med venlig hilsen
>
> Claus Ibsen
> ......................................
> Silverbullet
> Skovsgårdsvænget 21
> 8362 Hørning
> Tlf. +45 2962 7576
> Web: www.silverbullet.dk
>
> -----Original Message-----
> From: RobMMS [mailto:[EMAIL PROTECTED]
> Sent: 27. oktober 2008 13:15
> To: [email protected]
> Subject: ExchangePatterns on Bean-Component
>
>
> Hi,
> I'm observing a somewhat strange behaviour in my route:
>
> from("direct:start").multicast(new MyOutAggregationStrategy(),
> true).to("direct:a", "direct:b");
>
> from("direct:a").process(new myProcessor()).bean(beanA,
> "methodA").to("direct:merge");
> from("direct:a").process(new myProcessor()).bean(beanB,
> "methodB").to("direct:merge");
>
> from("direct:merge").
> aggregator(header(QueryPreProcessor.HEADER_CORRELATION_ID), new
> ResultAggregationStrategy()).
> completedPredicate(header("aggregated").
> isEqualTo(header("parallelStagesCount"))).
> to("mock:end");
>
> List result =(List) template.sendBody("direct:start",
> ExchangePattern.InOut,
> myQueryString);
>
>
> What I want to do here is, I send a query-String to direct:a, which then
> multicasts to my parallel working beans, that each take a String value as
> paramter in methodA/B and return a ResultObjectA/B which is set as Body
> automatically (I do not set this explicitly in the beans).
> My ResultAggregationStrategy then puts ResultObjectA and ResultObjectB
> into
> als List-List which I want to get back as result of the template.sendbody
> call.
>
>
> The problem is that my result is most of the time the return value of the
> first bean being processed, only sometimes (like 10% of the time) it
> returns
> the aggregated list as expected.
>
> I would expect the my route processes each step an when the aggregated
> message reaches mock:end the message's out body is returned to the caller
> ->
> result
> I'm not quite sure when I have to set the message's Out-Body and when it's
> ok to set the In-Body, because I think thats the problem here, but I'm not
> sure.
>
> Maybe I'm completely wrong, so I hope you get what I need and tell me
> whats
> wrong with my route ;)
>
>
> Thanks,
> Rob
> --
> View this message in context:
> http://www.nabble.com/ExchangePatterns-on-Bean-Component-tp20186540s22882p20186540.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>
>
--
View this message in context:
http://www.nabble.com/ExchangePatterns-on-Bean-Component-tp20186540s22882p20383351.html
Sent from the Camel - Users mailing list archive at Nabble.com.