On 05/11/2007, Roman Kalukiewicz <[EMAIL PROTECTED]> wrote:
> Hello!
>
> I was thinking about this problem a little, and I would like to ask
> someone more fluent in camel about his/her thoughts about the thing.
>
> I thought about creating common base component based on something like
> DelayedQueue class where whenever you receive an exchange you will
> execute strategy class that is responsible for creating
> DelayedExchange class (wrapper for Exchange that implements Delayed)
> and putting it to DelayedQueue.

Interesting. You could reuse the SedaEndpoint class; which is an
endpoint that uses any BlockingQueue implementation; though I guess
we'd need to modify it a little to deal with the wrapping of a Delayed
object; so reuse might not be as clean as we'd like.

BTW we have the delayer...
http://activemq.apache.org/camel/delayer.html

which is kinda similar conceptually - though the Delayer is really
simple in that it just delays the current message's delivery by a time
expression. Another approach could be to couple this will a
resequencer to reorder the messages based on their expiration time
(soonest first) before feeding it into the delayer.

The only worry with things like using a DelayedQueue is transactions &
reliability - e.g. if your application terminates - do you loose
messages? Also if you're not careful the DelayedQueue can get rather
large. But there are so many different use cases; I can definitely see
the attraction of the DelayedQueue idea.

FWIW folks have wanted a delayed message dispatch in ActiveMQ for some time...
http://issues.apache.org/activemq/browse/AMQ-499

so it'd be great to provide this feature!

I guess we could one day provide alternative DelayedQueue
implementations; such as some kind of persistent delayed queue so that
messages can be sent reliably to the delayed queue (i.e. the send can
be a database insert). i.e. use the same Delayer component/endpoint
but allow a different factory to be provided to implement the
DelayedQueue container.

FWIW the BAM module...
http://cwiki.apache.org/CAMEL/bam.html

has a JPA based delay logic used to find expiration based alerts of
business processes; its in the ActivityMonitorEngine which in
pseudocode does...

                final Date timeNow = new Date(now);

                transactionTemplate.execute(new
TransactionCallbackWithoutResult() {
                    protected void
doInTransactionWithoutResult(TransactionStatus status) {
                        List<ActivityState> list =
template.find("select x from  ActivityState x where x.timeOverdue <
?1", timeNow);
                        for (ActivityState activityState : list) {
                            process(activityState);
                        }
                    }
                });


So I guess a JPA based implementation of the DelayedQueue would be
pretty easy to do; the hardest bit is figuring out how to persist the
message headers & body cleanly with JPA (but then that'd be a handy
thing anyway).



> Then you have one thread that just monitors this DelayedQueue and
> sends any exchange that is retrieved from this queue (very similar to
> StreamResequencer).
>
> This way we can create Delayer that will not block current thread.

Sounds cool to me.


> On the other hand we can create this strategy in a way that on every
> new exchange it will remove pending exchange from DelayedQueue, modify
> it (using some AggregatorStrategy) and put it once again to this
> Queue. If it will notice that aggregation is complete it will add
> aggregated Exchange to this queue with delay == 0.
>
> Logic to specify correlationId, default delay or batch size can be
> exposed as an Expression.
>
> I've already started to code it this way so if you have any comments
> for this then let me know.
> I hope that if it will be finished it could be included in camel distribution.

Great! Looking forward to your patch :)
-- 
James
-------
http://macstrac.blogspot.com/

Open Source SOA
http://open.iona.com

Reply via email to