On 05/11/2007, almilo <[EMAIL PROTECTED]> wrote:
> James.Strachan wrote:
> >
> > BTW Alberto I've added your test case to the distro so folks can
> > noodle on it and see if we can improve things some. See
> > AlbertoAggregatorTest in the camel-core module.
> >
>
> Many thanks, the example is a little strange but fun (hope so... :O)
Yeah :)
> James.Strachan wrote:
> >
> > The tricky thing is knowing when you're at the end of the batch
> > really. One simple solution would be to add some kinda predicate to
> > detect batch-completion. For example when we split messages we could
> > record how many split messages there are and each messages' counter so
> > that we know when we've aggregated them all together again?
> >
>
> I reviewed all other replies and seem too complicated for me... (I´m new to
> Camel and I chose it by simplicity = patterns + POJOs :)
:)
We definitely should strive to make things as simple as they possibly can be.
> I was thinking more in this simple 10.000 feet high way that you pointed out
> above:
> - Because this is a "patterns" project I think that when "splitting"
> messages would be nice to count or better correlate the pieces alltogether.
> Probably when theres a "split" later there is a "join".
Yeah. I've recently patched the splitter to keep track of each
message's counter and the total number of messages split. Though we
don't as yet assign a unique message ID to be used for correlating
later on. I guess we could do that?
> - I interpreted that the batchprocessor is conservative and doesn´t call
> AggregationStrategy until it is really needed. It is very nice to have but
> could it be possible to do AggregationStrategy more like this?:
>
> // newExchange: exchange that fullfils the AggregationExpression and
> needs
> to be processed
> // AggregationContext: managed space to store intermediate data
> // Returns: the resulting exchange when finished, null when not
> finished
> public Exchange aggregate(Exchange newExchange, AggregationContext
> context);
Just a thought - you can use an Exchange to hold any intermediate
data. So you can take the old and new exchange and create a totally
new empty exchange with some calculated values inside it if you like.
Or you could just apply the new exchange onto the old exchange (or
vice versa). This is why we made the strategy API take both new and
old exchanges and returns an exchange.
So am not sure if we need an AggregationContext per se - I think it'd
be easier to reuse stuff if we just used an Exchange (possibly the
same or possibly new) to store any state.
The one downside this gives is there's no easy way to return the
completed/not-completed state, unless we use a property / header.
So maybe we could combine approaches; using an AggregationState bean
which is something which creates the response Exchange and also
determines whether or not the aggregation is complete - then an
implementor of a strategy could just aggregate exchanges together and
store that; or use a separate exchange, or use totally separate state
and just create the final exchange dynamically etc.
So maybe something like
public interface AggregationState extends Processor {
// default state
boolean isCompleted();
Exchange getAggregatedExchange();
}
public interface AggregationStrategy {
AggregationState createAggregationState(Exchange firstMessage);
}
Then the state is created on the first message exchange received; then
after that the process() method is called on the AggregationState to
update itself and after its completed, the getAggregatedExchange() is
used to determine what the aggregated exchange looks like?
> Ideas:
> - When a Message is splitted a "sourceMessage" property could be populated.
> The splitter correlates all new messages through this value. Then a default
> aggregation expression could be constructed.
The Aggregator assumes there's some expression used to correlate the
messages; but the Splitter doesn't necessarily attach a unique message
header to each split message. We should maybe add that?
> This is obviously too simple
> and something more robust is needed but seems natural to split and join and
> now this needs a lot of "glue" headers or the like to be done. Parallel
> processing pipelines through POJOs and SEDA seem very easy to be built with
> Camel DSL and are very powerful.
>
> - Using the AggregationContext you can store partial results and, when
> finished (you control the batch), the new Exchange is built and returned.
> Actual strategy seems also possible to be implemented as a specialization.
> Through context should also possible to know when there are more messages
> pending and when started the batch so you can keep a rate of returning
> exchanges or wait a little bit more.
>
> - This context could be transient or persistent (configured by DSL or XML)
> and maybe bound to a transaction to avoid crash fails (UnitOfWork??). Could
> camel manage that persistence? <- sounds not good for simplicity ;O)
Hopefully we could create a simple-to-use persistence :). I guess a
persistent strategy could use dependency injection in Spring to deal
with the JPA stuff; so keeping camel nice and simple?
Using transactions is also cool - the only problem is if you have
messages for different correlationIDs mixed up. e.g. imagine if you
split all messages into 2 pieces. Then for A we have A1, A2 and B we
have B1, B2. The problems' gonna be if you receive them like A1, B1,
A2, C1, B2, D1, C2....
you're never gonna reach a boundary where you can commit without
possibly loosing part of another aggregation. But I guess you could
put a resequencer in front to ensure order-across-correlationIDs is
ensured.
> I feel that these suggestions have a lot of problems with the issues of
> concurrency and concepts in Camel. Am I wrong?
>
> James, if this is good maybe I´m implementing a prototype and posting it :O)
Great! :)
I'm still digesting all the recent thoughts on this topic - e.g. I
like the simplicity of Hiram's idea - I feel like we're making
progress & some great ideas are flowing. Hopefully we can end up with
some really powerful and flexible - yet simple to use solutions.
--
James
-------
http://macstrac.blogspot.com/
Open Source SOA
http://open.iona.com