Thanks for this explanation. I have one more question targeting the completed Predicate? Where and how can I specify that?
My aggregator now looks like that from("hospital:camel.hospital.nurse.aggregator.doctor") .aggregator(header("JMSCorrelationID"), aggregationStrategy) .to("hospital:camel.hospital.nurse.aggregator.doctor.receiver"); My AggregationStrategy is that: final AggregationStrategy aggregationStrategy = new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { Object oldObject = oldExchange.getIn().getBody(); if(oldObject instanceof ElementImpl) { ElementImpl oldBody = (ElementImpl)oldExchange.getIn().getBody(); ElementImpl newBody = (ElementImpl)newExchange.getIn().getBody(); DocumentImpl doc = new DocumentImpl(); Element root = doc.createElement("document"); //import oldNode to new Document Node oldElem = doc.importNode(oldBody, true); Node newElem = doc.importNode(newBody, true); root.appendChild(oldElem); root.appendChild(newElem); doc.appendChild(root); Exchange aggregatedExchange = new DefaultExchange(context); aggregatedExchange.getIn().setBody(doc); return aggregatedExchange; } else if(oldObject instanceof DocumentImpl) { //get document DocumentImpl oldDoc = (DocumentImpl)oldObject; //get new element ElementImpl newBody = (ElementImpl)newExchange.getIn().getBody(); //import new element into document Node newElem = oldDoc.importNode(newBody, true); //get root element of document Node root = oldDoc.getFirstChild(); //remove root from document oldDoc.removeChild(root); //append new element to root root.appendChild(newElem); oldDoc.appendChild(root); Exchange aggregatedExchange = new DefaultExchange(context); aggregatedExchange.getIn().setBody(oldDoc); return aggregatedExchange; } return oldExchange; } }; Has the completed Predicate be specified in my AggregationStrategy or at my route? Do you have an example for that? Thanks Robert -----Ursprüngliche Nachricht----- Von: Roman Kalukiewicz [mailto:[EMAIL PROTECTED] Gesendet: Montag, 14. Jänner 2008 11:23 An: camel-user@activemq.apache.org Betreff: Re: AW: AW: Aggregation problems To clarify few things that were told in this thread. I believe that your correlationId could be specified in some easier way. Instead of creating expression instance it should be enough to use: .aggregator(header("JMSCorrelationID"), aggregationStrategy) as 'header("JMSCorrelationID")' is already an expression. About those batch sizes and timeouts it works in a tricky way. They are used ONLY if you don't have a completedPredicate specified. If you have it, then other settings are ignored. What is important - this predicate is evaluated on your aggregated exchange when message arrives - so it is hard to have a timeout condition here. When no completedPredicate is specified then we assume that it is time to send our aggregated batches when: * overall number of collected messages since last send is >= batchSize (it is not a size of some batch) or * on timeout (that ticks on its own - it is not 'event driven' so it doesn't start when first exchange is received - it just starts when context is started) In your particular case, when you don't know how many messages could belong to the batch - is there any way to 'evaluate it'? If so, then completed Predicate is for you. If there is no way to evaluate it, then you need some kind of timeout (unfortunately at the moment we cannot specify a condition like "send my batch when there is no message for it for 5 sec" - we rather have "send my batch every 5 sec whatever we have aggregated"). I hope it clears it a little (should we update our Aggregator wiki page? ;) ) Roman