Yes ,the  multicast  do not as you want.
Here is the MulticastProcessor's process code.
The processors is the "to" endpoints' processor, MulticastProcessor will call them one by one . We could not get any benefit by forking multi-thread to parallel calling the "to" endpoints process.
public void process(Exchange exchange) throws Exception {
       Exchange result = null;
       for (Processor producer : processors) {
           Exchange copy = copyExchangeStrategy(producer, exchange);
           producer.process(copy);
           if (aggregationStrategy != null) {
               if (result == null) {
                   result = copy;
               } else {
result = aggregationStrategy.aggregate(result, copy); } } }
       if (result != null) {
           ExchangeHelper.copyResults(exchange, result);
       }
   }

Maybe we could define a new ParallelizedMulticastProcessor for calling the to endpoint in different thread, and a barrier class could help us to get the calling threads aggregated.

BTW, you can find ThreadProcessor for Thread syntax implementation.

Any thought?

Willem

aswin.nair wrote:
willem.jiang wrote:
For aswin's case , How about ?

from("cxf:bean:soapMessageEndpoint").multicast(new
BodyOutAggregatingStrategy()).thread(3).to("direct:webservice1",
"direct:webservice2", "direct:webService3");
from("direct:webservice1").to("cxf://webservice1");
from("direct:webservice2").to("cxf://webservice2");
from("direct:webservice3").to("cxf://webservice3");


I tried the same route with slight modification, so that I can test it
rapidly using different DSLs and I have the following example

// a simple processor that just prints something and waits for 5 seconds Processor simpleProcessor = new Processor() {
                public void process(Exchange exchange) throws Exception {
                        System.out.println("Simple Processor processing stuff for 5 
seconds.");
                        Thread.sleep(1000 * 5);
                }               
};
    // routes from Willem's example slightly modified
        from("direct:start")
       .multicast().thread(3)
       .to("direct:webservice1","direct:webservice2", "direct:webService3");
                
       from("direct:webservice1").process(simpleProcessor);
       from("direct:webservice2").process(simpleProcessor);
       from("direct:webservice3").process(simpleProcessor);

    // invoke using camel template with a basic default exchange
    template.send("direct:start", exchange);

Output Simple Processor processsing stuff for 5 seconds.
[delay for 5 sec]
Simple Processor processsing stuff for 5 seconds.

So basically the routes "from"webservice1" and "from:webservice3" are
running sequentially and in same thread. The following route from("direct:start")
    .multicast()
    .thread(3).process(simpleProcessor).end()
    .thread(3).process(simpleProcessor).end()
    .thread(3).process(simpleProcessor).end();

Output
Thread[0] Simple Processor processsing stuff for 5 seconds. [delay for 5 sec] Thread[1] Simple Processor processsing stuff for 5 seconds. [delay for 5 sec] Thread[2] Simple Processor processsing stuff for 5 seconds.

I tried Chirno's suggestion also, but was unlucky.

Digging the code revealed that splitter/mulitcastprocessor etc process
message synchronously (as far as I understood it). Anyways if there is
anything I should be doing to get this working, please let me know and I  am
stuck on this part.

Reply via email to