Hi all, I have written a consumer-only component that combines aggregation logic with transacted JMS sessions that I would like to contribute. The component vastly speeds up message consumption and aggregation without message loss on failure when compared with using a regular JMS component and aggregator.
The problem that it solves is that when you want to aggregate a set of messages from JMS and avoid message loss, you typically reach for a JdbcAggregationRepository. This in turn fetches and writes progressively larger blobs from the database on receipt of each message, slowing down linearly in relation to to the number of messages consumed - i.e. it performs progressively worse the larger the batch. Old way: from("jms:myQueue") .transacted() .aggregate(constant(true), myAggStrategy) .aggregationRepository(jdbcAggregationRepository) .completionSize(100) .completionTimeout(500) This also suffers from a problem that message loss is still possible between the message broker and the database that stores the aggregated message (unless you use XA transactions....). The component that I have developed starts a JMS session, and receives messages synchronously until it meets a completion size, or until a completion timeout is met, each time calling an AggregationStrategy. Only when the completion conditions have been matched does it emit the aggregated message. The component will commit the batch transaction if the Exchange is processed successfully, or roll the entire thing back on exception - so all of the original messages will end up back on the queue for re-processing. In the event of failure of the Camel process, the messages remain on the broker for re-dispatch. So in terms of "where is my data stored?", the answer is it remains on the broker until the batch is successfully processed. New way: from("aggjms:myQueue?completionSize=100&completionTimeout=500&aggregationStrategy=#myAggStrategy") The component also allows for setting the number of JMS consumers on the endpoint, so you can scale out the number of threads that pick up batches. The transactional behaviour of this (and so its usage) is so different to the regular JMS and SJMS components, that I believe it needs to be it's own component, as opposed to being integrated in to one of the others. I would like to contribute this to Camel. What is the process for doing this? Thanks, Jakub