Hi, i have created a JIRA issue here https://issues.apache.org/jira/browse/CAMEL-20103 including a potential fix and also a pull request here https://github.com/apache/camel/pull/11971
Best regards, Peter Am Fr., 10. Nov. 2023 um 16:36 Uhr schrieb Otavio Rodolfo Piske < angusyo...@gmail.com>: > Hi, I am interested in investigating this. > > Please, can you create a ticket [1] on our Jira and attach the reproducer? > > 1. > > https://camel.apache.org/camel-core/contributing/#_reporting_a_bug_or_problem > > Kind regards > > On Fri, Nov 10, 2023 at 3:50 PM Peter Nowak <peter.no...@ecosio.com > .invalid> > wrote: > > > Hi all, > > > > i stumbled across a maybe critical concurrency bug in the > > DefaultProducerCache of camel 4.x under high (concurrent) load related > > to the change from this ticket > > https://issues.apache.org/jira/browse/CAMEL-19058 and this commit > > > > > https://github.com/apache/camel/commit/921ce519331ac5c8b0a1dfd099f9acbaba4feeab > > > > We encountered strange misrouted exchanges (and therefor errors) after > > upgrading to camel 4.0 under high load scenarios - wrong producer > > templates were picked from the cache due to the lack of > > synchronization > > > > I have created a small unit test which show the problem (i placed it > > in the "DefaultProducerCacheTest" class - just a proof of concept, not > > cleaned up at all :D) > > --------------- > > > > @Test > > public void testAcquireConcurrencyIssues() throws > > InterruptedException, ExecutionException { > > DefaultProducerCache cache = new DefaultProducerCache(this, context, > > 0); > > cache.start(); > > List<Endpoint> endpoints = new ArrayList<>(); > > for (int i = 0; i < 3; i++) { > > Endpoint e = context.getEndpoint("direct:queue:" + i); > > AsyncProducer p = cache.acquireProducer(e); > > endpoints.add(e); > > } > > > > assertEquals(3, cache.size()); > > > > ExecutorService ex = Executors.newFixedThreadPool(16); > > > > List<Callable<Boolean>> callables = new ArrayList<>(); > > > > for(int i = 0; i < 500; i++) { > > int index = i % 3; > > callables.add(() -> { > > Producer producer = > > cache.acquireProducer(endpoints.get(index)); > > boolean isEqual = > > > > > producer.getEndpoint().getEndpointUri().equalsIgnoreCase(endpoints.get(index).getEndpointUri()); > > > > if(!isEqual) { > > System.out.println("Endpoint uri to acquire: " + > > endpoints.get(index).getEndpointUri() + ", returned producer (uri): " > > + producer.getEndpoint().getEndpointUri()); > > } > > > > return isEqual; > > }); > > } > > > > for (int i = 0; i < 100; i++) { > > System.out.println("Iteration: " + (i + 1)); > > List<Future<Boolean>> results = ex.invokeAll(callables); > > for (Future<Boolean> future : results) { > > assertEquals(true, future.get()); > > } > > } > > } > > --------------- > > > > Fails on my machine 10 out of 10 > > > > if i synchronize the read (also just ugly for testing purposes, no > > performance testing done) in the "acquire" method in the > > "DefaultProducerCache" the test is green every time: > > > > Changes in the "DefaultProducerCache": > > ----------------- > > > > @Override > > public AsyncProducer acquireProducer(Endpoint endpoint) { > > // Try to favor thread locality as some data in the producer's > > cache may be shared among threads, > > // triggering cases of false sharing > > synchronized (this) { > > if (endpoint == lastUsedEndpoint && > > endpoint.isSingletonProducer()) { > > return lastUsedProducer; > > } > > } > > > > try { > > AsyncProducer producer = producers.acquire(endpoint); > > if (statistics != null) { > > statistics.onHit(endpoint.getEndpointUri()); > > } > > > > synchronized (this) { > > lastUsedEndpoint = endpoint; > > lastUsedProducer = producer; > > } > > > > return producer; > > } catch (Exception e) { > > throw new FailedToCreateProducerException(endpoint, e); > > } > > } > > ----------------- > > > > With the current approach of just syncing the write to the variables > > lastUsedEndpoint/Producer, still other threads can read the variables > > in between and then get a wrong producer template returned not > > matching to the requested endpoint. > > > > Best regards, > > Peter Nowak > > > > -- > > Peter Nowak > > peter.no...@ecosio.com > > ecosio GmbH > > Lange Gasse 30 | 1080 Wien | Austria > > VAT number: ATU68241501, FN 405017p, Commercial Court Vienna > > Managing Directors: Christoph Ebm, Philipp Liegl, Marco Zapletal > > > > > -- > Otavio R. Piske > http://orpiske.net > -- Peter Nowak peter.no...@ecosio.com ecosio GmbH Lange Gasse 30 | 1080 Wien | Austria <https://www.linkedin.com/company/ecosiohq/> <https://www.instagram.com/ecosiohq/> <https://www.facebook.com/ecosioHQ/> <https://twitter.com/ecosiohq> <https://ecosio.com/en/webinars?utm_source=email-banner&utm_medium=email&utm_id=2023_webinars> VAT number: ATU68241501, FN 405017p, Commercial Court Vienna Managing Directors: Christoph Ebm, Philipp Liegl, Marco Zapletal