Hi,

i have created a JIRA issue here
https://issues.apache.org/jira/browse/CAMEL-20103 including a potential fix
and also a pull request here https://github.com/apache/camel/pull/11971

Best regards,
Peter

Am Fr., 10. Nov. 2023 um 16:36 Uhr schrieb Otavio Rodolfo Piske <
angusyo...@gmail.com>:

> Hi, I am interested in investigating this.
>
> Please, can you create a ticket [1] on our Jira and attach the reproducer?
>
> 1.
>
> https://camel.apache.org/camel-core/contributing/#_reporting_a_bug_or_problem
>
> Kind regards
>
> On Fri, Nov 10, 2023 at 3:50 PM Peter Nowak <peter.no...@ecosio.com
> .invalid>
> wrote:
>
> > Hi all,
> >
> > i stumbled across a maybe critical concurrency bug in the
> > DefaultProducerCache of camel 4.x under high (concurrent) load related
> > to the change from this ticket
> > https://issues.apache.org/jira/browse/CAMEL-19058 and this commit
> >
> >
> https://github.com/apache/camel/commit/921ce519331ac5c8b0a1dfd099f9acbaba4feeab
> >
> > We encountered strange misrouted exchanges (and therefor errors) after
> > upgrading to camel 4.0 under high load scenarios - wrong producer
> > templates were picked from the cache due to the lack of
> > synchronization
> >
> > I have created a small unit test which show the problem (i placed it
> > in the "DefaultProducerCacheTest" class - just a proof of concept, not
> > cleaned up at all :D)
> > ---------------
> >
> > @Test
> > public void testAcquireConcurrencyIssues() throws
> > InterruptedException, ExecutionException {
> >     DefaultProducerCache cache = new DefaultProducerCache(this, context,
> > 0);
> >     cache.start();
> >     List<Endpoint> endpoints = new ArrayList<>();
> >     for (int i = 0; i < 3; i++) {
> >         Endpoint e = context.getEndpoint("direct:queue:" + i);
> >         AsyncProducer p = cache.acquireProducer(e);
> >         endpoints.add(e);
> >     }
> >
> >     assertEquals(3, cache.size());
> >
> >     ExecutorService ex = Executors.newFixedThreadPool(16);
> >
> >     List<Callable<Boolean>> callables = new ArrayList<>();
> >
> >     for(int i = 0; i < 500; i++) {
> >         int index = i % 3;
> >         callables.add(() -> {
> >             Producer producer =
> > cache.acquireProducer(endpoints.get(index));
> >             boolean isEqual =
> >
> >
> producer.getEndpoint().getEndpointUri().equalsIgnoreCase(endpoints.get(index).getEndpointUri());
> >
> >             if(!isEqual) {
> >                 System.out.println("Endpoint uri to acquire: " +
> > endpoints.get(index).getEndpointUri() + ", returned producer (uri): "
> > + producer.getEndpoint().getEndpointUri());
> >             }
> >
> >             return isEqual;
> >         });
> >     }
> >
> >     for (int i = 0; i < 100; i++) {
> >         System.out.println("Iteration: " + (i + 1));
> >         List<Future<Boolean>> results = ex.invokeAll(callables);
> >         for (Future<Boolean> future : results) {
> >             assertEquals(true, future.get());
> >         }
> >     }
> > }
> > ---------------
> >
> > Fails on my machine 10 out of 10
> >
> > if i synchronize the read (also just ugly for testing purposes, no
> > performance testing done) in the "acquire" method in the
> > "DefaultProducerCache" the test is green every time:
> >
> > Changes in the "DefaultProducerCache":
> > -----------------
> >
> > @Override
> > public AsyncProducer acquireProducer(Endpoint endpoint) {
> >     // Try to favor thread locality as some data in the producer's
> > cache may be shared among threads,
> >     // triggering cases of false sharing
> >     synchronized (this) {
> >         if (endpoint == lastUsedEndpoint &&
> > endpoint.isSingletonProducer()) {
> >             return lastUsedProducer;
> >         }
> >     }
> >
> >     try {
> >         AsyncProducer producer = producers.acquire(endpoint);
> >         if (statistics != null) {
> >             statistics.onHit(endpoint.getEndpointUri());
> >         }
> >
> >         synchronized (this) {
> >             lastUsedEndpoint = endpoint;
> >             lastUsedProducer = producer;
> >         }
> >
> >         return producer;
> >     } catch (Exception e) {
> >         throw new FailedToCreateProducerException(endpoint, e);
> >     }
> > }
> > -----------------
> >
> > With the current approach of just syncing the write to the variables
> > lastUsedEndpoint/Producer, still other threads can read the variables
> > in between and then get a wrong producer template returned not
> > matching to the requested endpoint.
> >
> > Best regards,
> > Peter Nowak
> >
> > --
> > Peter Nowak
> > peter.no...@ecosio.com
> > ecosio GmbH
> > Lange Gasse 30 | 1080 Wien | Austria
> > VAT number: ATU68241501, FN 405017p, Commercial Court Vienna
> > Managing Directors: Christoph Ebm, Philipp Liegl, Marco Zapletal
> >
>
>
> --
> Otavio R. Piske
> http://orpiske.net
>


-- 
Peter Nowak

peter.no...@ecosio.com
ecosio GmbH
Lange Gasse 30 | 1080 Wien | Austria
<https://www.linkedin.com/company/ecosiohq/>
<https://www.instagram.com/ecosiohq/> <https://www.facebook.com/ecosioHQ/>
<https://twitter.com/ecosiohq>
<https://ecosio.com/en/webinars?utm_source=email-banner&utm_medium=email&utm_id=2023_webinars>
VAT number: ATU68241501, FN 405017p, Commercial Court Vienna
Managing Directors: Christoph Ebm, Philipp Liegl, Marco Zapletal

Reply via email to