This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 3fe53e0894a [CAMEL-20829] Streamline ServicePool synchronization
(#14312)
3fe53e0894a is described below
commit 3fe53e0894a1e31dac44d3ac03ac615830612d3c
Author: Guillaume Nodet <[email protected]>
AuthorDate: Tue Jun 4 08:09:44 2024 +0200
[CAMEL-20829] Streamline ServicePool synchronization (#14312)
---
.../apache/camel/support/cache/ServicePool.java | 74 +++++++---------------
1 file changed, 22 insertions(+), 52 deletions(-)
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
b/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
index de7b5b513e1..72373c96031 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
@@ -17,11 +17,12 @@
package org.apache.camel.support.cache;
import java.util.ArrayList;
-import java.util.List;
+import java.util.Deque;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
@@ -53,8 +54,6 @@ abstract class ServicePool<S extends Service> extends
ServiceSupport implements
private final ConcurrentMap<Endpoint, Pool<S>> singlePoolEvicted = new
ConcurrentHashMap<>();
private final int capacity;
private final Map<S, S> cache;
- // synchronizes access only to cache
- private final Object cacheLock;
private interface Pool<S> {
S acquire() throws Exception;
@@ -75,11 +74,10 @@ abstract class ServicePool<S extends Service> extends
ServiceSupport implements
this.getEndpoint = getEndpoint;
this.capacity = capacity;
this.cache = capacity > 0 ? LRUCacheFactory.newLRUCache(capacity,
this::onEvict) : null;
- this.cacheLock = capacity > 0 ? new Object() : null;
}
/**
- * This callback is invoked by LRUCache from a separate background cleanup
thread. Therefore we mark the entries to
+ * This callback is invoked by LRUCache from a separate background cleanup
thread. Therefore, we mark the entries to
* be evicted from this thread only, and then let SinglePool and MultiPool
handle the evictions (stop the
* producer/consumer safely) when they are acquiring/releases
producers/consumers. If we stop the producer/consumer
* from the LRUCache background thread we can have a race condition with a
pooled producer may have been acquired at
@@ -117,15 +115,7 @@ abstract class ServicePool<S extends Service> extends
ServiceSupport implements
}
S s = getOrCreatePool(endpoint).acquire();
if (s != null && cache != null) {
- if (isStoppingOrStopped()) {
- // during stopping then access to the cache is synchronized
- synchronized (cacheLock) {
- cache.putIfAbsent(s, s);
- }
- } else {
- // optimize for normal operation
- cache.putIfAbsent(s, s);
- }
+ cache.putIfAbsent(s, s);
}
return s;
}
@@ -182,10 +172,8 @@ abstract class ServicePool<S extends Service> extends
ServiceSupport implements
pool.values().forEach(Pool::stop);
pool.clear();
if (cache != null) {
- synchronized (cacheLock) {
- cache.values().forEach(ServicePool::stop);
- cache.clear();
- }
+ cache.values().forEach(ServicePool::stop);
+ cache.clear();
}
singlePoolEvicted.values().forEach(Pool::stop);
singlePoolEvicted.clear();
@@ -295,29 +283,19 @@ abstract class ServicePool<S extends Service> extends
ServiceSupport implements
* thread at any given time.
*/
private class MultiplePool implements Pool<S> {
- private final Object lock = new Object();
private final Endpoint endpoint;
private final BlockingQueue<S> queue;
- private final List<S> evicts;
+ private final Deque<S> evicts;
MultiplePool(Endpoint endpoint) {
this.endpoint = endpoint;
this.queue = new ArrayBlockingQueue<>(capacity);
- this.evicts = new ArrayList<>();
+ this.evicts = new ConcurrentLinkedDeque<>();
}
private void cleanupEvicts() {
- if (!evicts.isEmpty()) {
- synchronized (lock) {
- if (!evicts.isEmpty()) {
- for (S evict : evicts) {
- queue.remove(evict);
- // stop the service after having removed it from
queue
- doStop(evict);
- }
- evicts.clear();
- }
- }
+ for (S evict = evicts.pollFirst(); evict != null; evict =
evicts.pollFirst()) {
+ doStop(evict);
}
}
@@ -325,13 +303,10 @@ abstract class ServicePool<S extends Service> extends
ServiceSupport implements
public S acquire() throws Exception {
cleanupEvicts();
- S s;
- synchronized (lock) {
- s = queue.poll();
- if (s == null) {
- s = creator.apply(endpoint);
- s.start();
- }
+ S s = queue.poll();
+ if (s == null) {
+ s = creator.apply(endpoint);
+ s.start();
}
return s;
}
@@ -340,11 +315,9 @@ abstract class ServicePool<S extends Service> extends
ServiceSupport implements
public void release(S s) {
cleanupEvicts();
- synchronized (lock) {
- if (!queue.offer(s)) {
- // there is no room so lets just stop and discard this
- doStop(s);
- }
+ if (!queue.offer(s)) {
+ // there is no room so let's just stop and discard this
+ doStop(s);
}
}
@@ -355,19 +328,16 @@ abstract class ServicePool<S extends Service> extends
ServiceSupport implements
@Override
public void stop() {
- synchronized (lock) {
- queue.forEach(this::doStop);
- queue.clear();
- pool.remove(endpoint);
- }
+ ArrayList<S> list = new ArrayList<>();
+ queue.drainTo(list);
+ pool.remove(endpoint);
+ list.forEach(this::doStop);
}
@Override
public void evict(S s) {
// to be evicted
- synchronized (lock) {
- evicts.add(s);
- }
+ evicts.add(s);
}
@Override