[
https://issues.apache.org/jira/browse/CAMEL-6042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Aaron Whiteside updated CAMEL-6042:
-----------------------------------
Attachment: retry_policy_+_unit_tests.patch
Sorry for missing the 2.11 release, I was on vacation.
Please find attached a patch with support for a retry policy and more unit
tests.
> AggregateProcessor/AggregationRepository does not deal with optimistic
> locking - will not work correctly in a distributed environment
> -------------------------------------------------------------------------------------------------------------------------------------
>
> Key: CAMEL-6042
> URL: https://issues.apache.org/jira/browse/CAMEL-6042
> Project: Camel
> Issue Type: Improvement
> Components: camel-core
> Affects Versions: 2.10.3
> Environment: Glassfish + Gemini Blueprint + Spring 3.2
> Reporter: Aaron Whiteside
> Assignee: Claus Ibsen
> Fix For: 2.11.0
>
> Attachments: aggregate-optimistic-locking-support1.patch,
> aggregate-optimistic-locking-support2.patch,
> aggregate-optimistic-locking-support.patch,
> javadoc-and-unit-test-updates.patch, retry_policy_+_unit_tests.patch
>
>
> AggregateProcessor/AggregationRepository does not deal with optimistic
> locking - and will not work correctly in a distributed environment.
> I started to write a Voldemort specific AggregationRepository I saw that the
> AggregateProcessor does not deal with optimistic locking. It uses a single
> AggregateProcessor instance specific lock.
> In a distributed environment where there are many Camel instances on many
> servers using a shared data store for the AggregationRepository this will not
> work.
> Consider the following scenario using a persistent/shared
> AggregationRepository:
> Camel instance A on server A, receives Exchange 1..
> # AggregateProcessor first acquires instance specific lock.
> # AggregateProcessor calls oldExchange = AggregationRepository.get()
> # oldExchange is null
> Camel instance B on server B, receives Exchange 2..
> # AggregateProcessor first acquires instance specific lock.
> # AggregateProcessor calls oldExchange = AggregationRepository.get()
> # oldExchange is null
> Camel instance A & B at the same time both call..
> # AggregateProcessor calls AggregationStrategy with the new exchange and old
> null exchange
> # aggregationRepository.add() with the result (the new exchange)
> # Camel instance A succeeds to store the new exchange.
> # Camel instance B fails with an exception stating that something is already
> stored using that exchange id.
> ## at this point I could write my AggregationRepository implementation to
> ignore the existing entry and overwrite it. But this would mean the exiting
> exchange is lost and never aggregated.
> A possible solution would be:
> a) Remove the lock from AggregateProcessor
> a1) Put the lock in the MemoryAggregationRepository or
> a2) Use a ConcurrentHashMap.putIfAbsent method (and then continue on to do B
> below).
> b) Introduce an AggregationRepositoryOptimisticLockException (name it
> whatever you want) that is thrown when an AggregationRepository detects that
> someone is trying to add() the same exchange id at the same time.
> Upon receiving this exception the AggregateProcessor would re-get() the
> oldExchange (now not null) from the AggregationRepository and call the
> AggregationStrategy again to aggregate the old and the new exchanges.
> This would ensure that no exchanges fail to aggregate in a distributed
> environment. Given that the underlying AggregationRepository is able to
> detect concurrent add()'s. Which most should be able to (using conditional
> updates).
> For example:
> SQL could try and insert into a table with a unique constraint on the
> correlation id. When the constraint is violated JPA/JDBC/whatever will throw
> a unique constraint violation exception which can be converted into a
> AggregationRepositoryOptimisticLockException.
> And HawtDB supports optimistic locking out of the box, by throwing a
> OptimisticUpdateException when it detects concurrent updates. So updating
> this component to take advantage of this feature should be very simple.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira