[ 
https://issues.apache.org/jira/browse/CAMEL-6042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Whiteside updated CAMEL-6042:
-----------------------------------

    Attachment: retry_policy_+_unit_tests.patch

Sorry for missing the 2.11 release, I was on vacation.

Please find attached a patch with support for a retry policy and more unit 
tests.
                
> AggregateProcessor/AggregationRepository does not deal with optimistic 
> locking - will not work correctly in a distributed environment
> -------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-6042
>                 URL: https://issues.apache.org/jira/browse/CAMEL-6042
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-core
>    Affects Versions: 2.10.3
>         Environment: Glassfish + Gemini Blueprint + Spring 3.2
>            Reporter: Aaron Whiteside
>            Assignee: Claus Ibsen
>             Fix For: 2.11.0
>
>         Attachments: aggregate-optimistic-locking-support1.patch, 
> aggregate-optimistic-locking-support2.patch, 
> aggregate-optimistic-locking-support.patch, 
> javadoc-and-unit-test-updates.patch, retry_policy_+_unit_tests.patch
>
>
> AggregateProcessor/AggregationRepository does not deal with optimistic 
> locking - and will not work correctly in a distributed environment.
> I started to write a Voldemort specific AggregationRepository I saw that the 
> AggregateProcessor does not deal with optimistic locking. It uses a single 
> AggregateProcessor instance specific lock.
> In a distributed environment where there are many Camel instances on many 
> servers using a shared data store for the AggregationRepository this will not 
> work.
> Consider the following scenario using a persistent/shared 
> AggregationRepository:
> Camel instance A on server A, receives Exchange 1..
> # AggregateProcessor first acquires instance specific lock.
> # AggregateProcessor calls oldExchange = AggregationRepository.get()
> # oldExchange is null
> Camel instance B on server B, receives Exchange 2..
> # AggregateProcessor first acquires instance specific lock.
> # AggregateProcessor calls oldExchange = AggregationRepository.get()
> # oldExchange is null
> Camel instance A & B at the same time both call..
> # AggregateProcessor calls AggregationStrategy with the new exchange and old 
> null exchange
> # aggregationRepository.add() with the result (the new exchange)
> # Camel instance A succeeds to store the new exchange.
> # Camel instance B fails with an exception stating that something is already 
> stored using that exchange id.
> ## at this point I could write my AggregationRepository implementation to 
> ignore the existing entry and overwrite it. But this would mean the exiting 
> exchange is lost and never aggregated.
> A possible solution would be:
> a) Remove the lock from AggregateProcessor 
>  a1) Put the lock in the MemoryAggregationRepository or 
>  a2) Use a ConcurrentHashMap.putIfAbsent method (and then continue on to do B 
> below).
> b) Introduce an AggregationRepositoryOptimisticLockException (name it 
> whatever you want) that is thrown when an AggregationRepository detects that 
> someone is trying to add() the same exchange id at the same time.
> Upon receiving this exception the AggregateProcessor would re-get() the 
> oldExchange (now not null) from the AggregationRepository and call the 
> AggregationStrategy again to aggregate the old and the new exchanges.
> This would ensure that no exchanges fail to aggregate in a distributed 
> environment. Given that the underlying AggregationRepository is able to 
> detect concurrent add()'s. Which most should be able to (using conditional 
> updates).
> For example:
> SQL could try and insert into a table with a unique constraint on the 
> correlation id. When the constraint is violated JPA/JDBC/whatever will throw 
> a unique constraint violation exception which can be converted into a 
> AggregationRepositoryOptimisticLockException.
> And HawtDB supports optimistic locking out of the box, by throwing a 
> OptimisticUpdateException when it detects concurrent updates. So updating 
> this component to take advantage of this feature should be very simple.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to