On 05/11/2007, Roman Kalukiewicz <[EMAIL PROTECTED]> wrote: > Hello! > > I was thinking about this problem a little, and I would like to ask > someone more fluent in camel about his/her thoughts about the thing. > > I thought about creating common base component based on something like > DelayedQueue class where whenever you receive an exchange you will > execute strategy class that is responsible for creating > DelayedExchange class (wrapper for Exchange that implements Delayed) > and putting it to DelayedQueue.
Interesting. You could reuse the SedaEndpoint class; which is an endpoint that uses any BlockingQueue implementation; though I guess we'd need to modify it a little to deal with the wrapping of a Delayed object; so reuse might not be as clean as we'd like. BTW we have the delayer... http://activemq.apache.org/camel/delayer.html which is kinda similar conceptually - though the Delayer is really simple in that it just delays the current message's delivery by a time expression. Another approach could be to couple this will a resequencer to reorder the messages based on their expiration time (soonest first) before feeding it into the delayer. The only worry with things like using a DelayedQueue is transactions & reliability - e.g. if your application terminates - do you loose messages? Also if you're not careful the DelayedQueue can get rather large. But there are so many different use cases; I can definitely see the attraction of the DelayedQueue idea. FWIW folks have wanted a delayed message dispatch in ActiveMQ for some time... http://issues.apache.org/activemq/browse/AMQ-499 so it'd be great to provide this feature! I guess we could one day provide alternative DelayedQueue implementations; such as some kind of persistent delayed queue so that messages can be sent reliably to the delayed queue (i.e. the send can be a database insert). i.e. use the same Delayer component/endpoint but allow a different factory to be provided to implement the DelayedQueue container. FWIW the BAM module... http://cwiki.apache.org/CAMEL/bam.html has a JPA based delay logic used to find expiration based alerts of business processes; its in the ActivityMonitorEngine which in pseudocode does... final Date timeNow = new Date(now); transactionTemplate.execute(new TransactionCallbackWithoutResult() { protected void doInTransactionWithoutResult(TransactionStatus status) { List<ActivityState> list = template.find("select x from ActivityState x where x.timeOverdue < ?1", timeNow); for (ActivityState activityState : list) { process(activityState); } } }); So I guess a JPA based implementation of the DelayedQueue would be pretty easy to do; the hardest bit is figuring out how to persist the message headers & body cleanly with JPA (but then that'd be a handy thing anyway). > Then you have one thread that just monitors this DelayedQueue and > sends any exchange that is retrieved from this queue (very similar to > StreamResequencer). > > This way we can create Delayer that will not block current thread. Sounds cool to me. > On the other hand we can create this strategy in a way that on every > new exchange it will remove pending exchange from DelayedQueue, modify > it (using some AggregatorStrategy) and put it once again to this > Queue. If it will notice that aggregation is complete it will add > aggregated Exchange to this queue with delay == 0. > > Logic to specify correlationId, default delay or batch size can be > exposed as an Expression. > > I've already started to code it this way so if you have any comments > for this then let me know. > I hope that if it will be finished it could be included in camel distribution. Great! Looking forward to your patch :) -- James ------- http://macstrac.blogspot.com/ Open Source SOA http://open.iona.com
