Thanks for this explanation. I have one more question targeting the
completed Predicate? Where and how can I specify that?

My aggregator now looks like that
from("hospital:camel.hospital.nurse.aggregator.doctor")
.aggregator(header("JMSCorrelationID"), aggregationStrategy)
.to("hospital:camel.hospital.nurse.aggregator.doctor.receiver"); 

My AggregationStrategy is that:
final AggregationStrategy aggregationStrategy = new AggregationStrategy() {
        public Exchange aggregate(Exchange oldExchange, Exchange
newExchange) {
                
        Object oldObject = oldExchange.getIn().getBody();
            if(oldObject instanceof ElementImpl) {
                ElementImpl oldBody =
(ElementImpl)oldExchange.getIn().getBody();
                        ElementImpl newBody =
(ElementImpl)newExchange.getIn().getBody();
                    
                        DocumentImpl doc = new DocumentImpl();
                        Element root = doc.createElement("document");
                        //import oldNode to new Document
                        Node oldElem = doc.importNode(oldBody, true);
                        Node newElem = doc.importNode(newBody, true);
                        
                        root.appendChild(oldElem);
                        root.appendChild(newElem);
                        doc.appendChild(root);
                        
                        Exchange aggregatedExchange = new
DefaultExchange(context);
                        aggregatedExchange.getIn().setBody(doc);
                    return aggregatedExchange;
                    
            } else if(oldObject instanceof DocumentImpl) {
                //get document
                DocumentImpl oldDoc = (DocumentImpl)oldObject;
                //get new element
                ElementImpl newBody =
(ElementImpl)newExchange.getIn().getBody();
                //import new element into document
                Node newElem = oldDoc.importNode(newBody, true);
                //get root element of document
                Node root = oldDoc.getFirstChild();
                //remove root from document
                oldDoc.removeChild(root);
                
                //append new element to root
                root.appendChild(newElem);
                oldDoc.appendChild(root);
                
                Exchange aggregatedExchange = new DefaultExchange(context);
                aggregatedExchange.getIn().setBody(oldDoc);
                   return aggregatedExchange;
            }
            return oldExchange;
     }
};


Has the completed Predicate be specified in my AggregationStrategy or at my
route? Do you have an example for that?

Thanks
Robert

-----Ursprüngliche Nachricht-----
Von: Roman Kalukiewicz [mailto:[EMAIL PROTECTED] 
Gesendet: Montag, 14. Jänner 2008 11:23
An: camel-user@activemq.apache.org
Betreff: Re: AW: AW: Aggregation problems


To clarify few things that were told in this thread.

I believe that your correlationId could be specified in some easier way.
Instead of creating expression instance it should be enough to
use:

.aggregator(header("JMSCorrelationID"), aggregationStrategy)

as 'header("JMSCorrelationID")' is already an expression.

About those batch sizes and timeouts it works in a tricky way. They are used
ONLY if you don't have a completedPredicate specified. If you have it, then
other settings are ignored. What is important - this predicate is evaluated
on your aggregated exchange when message arrives - so it is hard to have a
timeout condition here.

When no completedPredicate is specified then we assume that it is time to
send our aggregated batches when:
* overall number of collected messages since last send is >= batchSize (it
is not a size of some batch) or
* on timeout (that ticks on its own - it is not 'event driven' so it doesn't
start when first exchange is received - it just starts when context is
started)

In your particular case, when you don't know how many messages could belong
to the batch - is there any way to 'evaluate it'? If so, then completed
Predicate is for you. If there is no way to evaluate it, then you need some
kind of timeout (unfortunately at the moment we cannot specify a condition
like "send my batch when there is no message for it for 5 sec" - we rather
have "send my batch every 5 sec whatever we have aggregated").

I hope it clears it a little (should we update our Aggregator wiki page? ;)
)

Roman

Reply via email to