Yes ,the multicast do not as you want.
Here is the MulticastProcessor's process code.
The processors is the "to" endpoints' processor, MulticastProcessor
will call them one by one . We could not get any benefit by forking
multi-thread to parallel calling the "to" endpoints process.
public void process(Exchange exchange) throws Exception {
Exchange result = null;
for (Processor producer : processors) {
Exchange copy = copyExchangeStrategy(producer, exchange);
producer.process(copy);
if (aggregationStrategy != null) {
if (result == null) {
result = copy;
} else {
result = aggregationStrategy.aggregate(result,
copy);
}
}
}
if (result != null) {
ExchangeHelper.copyResults(exchange, result);
}
}
Maybe we could define a new ParallelizedMulticastProcessor for calling
the to endpoint in different thread, and a barrier class could help us
to get the calling threads aggregated.
BTW, you can find ThreadProcessor for Thread syntax implementation.
Any thought?
Willem
aswin.nair wrote:
willem.jiang wrote:
For aswin's case , How about ?
from("cxf:bean:soapMessageEndpoint").multicast(new
BodyOutAggregatingStrategy()).thread(3).to("direct:webservice1",
"direct:webservice2", "direct:webService3");
from("direct:webservice1").to("cxf://webservice1");
from("direct:webservice2").to("cxf://webservice2");
from("direct:webservice3").to("cxf://webservice3");
I tried the same route with slight modification, so that I can test it
rapidly using different DSLs and I have the following example
// a simple processor that just prints something and waits for 5 seconds
Processor simpleProcessor = new Processor() {
public void process(Exchange exchange) throws Exception {
System.out.println("Simple Processor processing stuff for 5
seconds.");
Thread.sleep(1000 * 5);
}
};
// routes from Willem's example slightly modified
from("direct:start")
.multicast().thread(3)
.to("direct:webservice1","direct:webservice2", "direct:webService3");
from("direct:webservice1").process(simpleProcessor);
from("direct:webservice2").process(simpleProcessor);
from("direct:webservice3").process(simpleProcessor);
// invoke using camel template with a basic default exchange
template.send("direct:start", exchange);
Output
Simple Processor processsing stuff for 5 seconds.
[delay for 5 sec]
Simple Processor processsing stuff for 5 seconds.
So basically the routes "from"webservice1" and "from:webservice3" are
running sequentially and in same thread.
The following route
from("direct:start")
.multicast()
.thread(3).process(simpleProcessor).end()
.thread(3).process(simpleProcessor).end()
.thread(3).process(simpleProcessor).end();
Output
Thread[0] Simple Processor processsing stuff for 5 seconds.
[delay for 5 sec]
Thread[1] Simple Processor processsing stuff for 5 seconds.
[delay for 5 sec]
Thread[2] Simple Processor processsing stuff for 5 seconds.
I tried Chirno's suggestion also, but was unlucky.
Digging the code revealed that splitter/mulitcastprocessor etc process
message synchronously (as far as I understood it). Anyways if there is
anything I should be doing to get this working, please let me know and I am
stuck on this part.