This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 67b444d8ddc CAMEL-20103: fix the currency problem acquiring a producer
template from the DefaultProducerCache
67b444d8ddc is described below
commit 67b444d8ddc46c89d54169ece756a503388b151c
Author: pnowak85 <[email protected]>
AuthorDate: Tue Nov 14 10:10:33 2023 +0100
CAMEL-20103: fix the currency problem acquiring a producer template from
the DefaultProducerCache
Fix the currency problem acquiring a producer template from the
DefaultProducerCache which can lead to a wrong producer being returned under
high load (#11971)
---
.../camel/impl/DefaultProducerCacheTest.java | 47 ++++++++++++++++++++++
.../camel/support/cache/DefaultProducerCache.java | 12 +++---
2 files changed, 52 insertions(+), 7 deletions(-)
diff --git
a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java
b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java
index e3b394a6bfd..bb82f409135 100644
---
a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java
@@ -16,7 +16,14 @@
*/
package org.apache.camel.impl;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@@ -208,6 +215,46 @@ public class DefaultProducerCacheTest extends
ContextTestSupport {
await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(3, stopCounter.get()));
}
+ @Test
+ public void testAcquireProducerConcurrency() throws InterruptedException,
ExecutionException {
+ DefaultProducerCache cache = new DefaultProducerCache(this, context,
0);
+ cache.start();
+ List<Endpoint> endpoints = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ Endpoint e = context.getEndpoint("direct:queue:" + i);
+ AsyncProducer p = cache.acquireProducer(e);
+ endpoints.add(e);
+ }
+
+ assertEquals(3, cache.size());
+
+ ExecutorService ex = Executors.newFixedThreadPool(16);
+
+ List<Callable<Boolean>> callables = new ArrayList<>();
+
+ for(int i = 0; i < 500; i++) {
+ int index = i % 3;
+ callables.add(() -> {
+ Producer producer =
cache.acquireProducer(endpoints.get(index));
+ boolean isEqual =
producer.getEndpoint().getEndpointUri().equalsIgnoreCase(endpoints.get(index).getEndpointUri());
+
+ if(!isEqual) {
+ log.info("Endpoint uri to acquire: " +
endpoints.get(index).getEndpointUri() + ", returned producer (uri): " +
producer.getEndpoint().getEndpointUri());
+ }
+
+ return isEqual;
+ });
+ }
+
+ for (int i = 1; i <= 100; i++) {
+ log.info("Iteration: {}", i);
+ List<Future<Boolean>> results = ex.invokeAll(callables);
+ for (Future<Boolean> future : results) {
+ assertEquals(true, future.get());
+ }
+ }
+ }
+
private static class MyProducerCache extends DefaultProducerCache {
private MyServicePool myServicePool;
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
index 0cb817aa451..c156ef2b108 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
@@ -59,7 +59,6 @@ public class DefaultProducerCache extends ServiceSupport
implements ProducerCach
private boolean extendedStatistics;
private final int maxCacheSize;
- private Endpoint lastUsedEndpoint;
private AsyncProducer lastUsedProducer;
public DefaultProducerCache(Object source, CamelContext camelContext, int
cacheSize) {
@@ -125,8 +124,10 @@ public class DefaultProducerCache extends ServiceSupport
implements ProducerCach
public AsyncProducer acquireProducer(Endpoint endpoint) {
// Try to favor thread locality as some data in the producer's cache
may be shared among threads,
// triggering cases of false sharing
- if (endpoint == lastUsedEndpoint && endpoint.isSingletonProducer()) {
- return lastUsedProducer;
+ // copy reference to avoid need for synchronization and be thread safe
+ AsyncProducer lastUsedProducerRef = lastUsedProducer;
+ if (lastUsedProducerRef != null && endpoint ==
lastUsedProducerRef.getEndpoint() && endpoint.isSingletonProducer()) {
+ return lastUsedProducerRef;
}
try {
@@ -135,10 +136,7 @@ public class DefaultProducerCache extends ServiceSupport
implements ProducerCach
statistics.onHit(endpoint.getEndpointUri());
}
- synchronized (this) {
- lastUsedEndpoint = endpoint;
- lastUsedProducer = producer;
- }
+ lastUsedProducer = producer;
return producer;
} catch (Exception e) {