This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 3fe53e0894a [CAMEL-20829] Streamline ServicePool synchronization 
(#14312)
3fe53e0894a is described below

commit 3fe53e0894a1e31dac44d3ac03ac615830612d3c
Author: Guillaume Nodet <[email protected]>
AuthorDate: Tue Jun 4 08:09:44 2024 +0200

    [CAMEL-20829] Streamline ServicePool synchronization (#14312)
---
 .../apache/camel/support/cache/ServicePool.java    | 74 +++++++---------------
 1 file changed, 22 insertions(+), 52 deletions(-)

diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
index de7b5b513e1..72373c96031 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
@@ -17,11 +17,12 @@
 package org.apache.camel.support.cache;
 
 import java.util.ArrayList;
-import java.util.List;
+import java.util.Deque;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentMap;
 import java.util.function.Function;
 
@@ -53,8 +54,6 @@ abstract class ServicePool<S extends Service> extends 
ServiceSupport implements
     private final ConcurrentMap<Endpoint, Pool<S>> singlePoolEvicted = new 
ConcurrentHashMap<>();
     private final int capacity;
     private final Map<S, S> cache;
-    // synchronizes access only to cache
-    private final Object cacheLock;
 
     private interface Pool<S> {
         S acquire() throws Exception;
@@ -75,11 +74,10 @@ abstract class ServicePool<S extends Service> extends 
ServiceSupport implements
         this.getEndpoint = getEndpoint;
         this.capacity = capacity;
         this.cache = capacity > 0 ? LRUCacheFactory.newLRUCache(capacity, 
this::onEvict) : null;
-        this.cacheLock = capacity > 0 ? new Object() : null;
     }
 
     /**
-     * This callback is invoked by LRUCache from a separate background cleanup 
thread. Therefore we mark the entries to
+     * This callback is invoked by LRUCache from a separate background cleanup 
thread. Therefore, we mark the entries to
      * be evicted from this thread only, and then let SinglePool and MultiPool 
handle the evictions (stop the
      * producer/consumer safely) when they are acquiring/releases 
producers/consumers. If we stop the producer/consumer
      * from the LRUCache background thread we can have a race condition with a 
pooled producer may have been acquired at
@@ -117,15 +115,7 @@ abstract class ServicePool<S extends Service> extends 
ServiceSupport implements
         }
         S s = getOrCreatePool(endpoint).acquire();
         if (s != null && cache != null) {
-            if (isStoppingOrStopped()) {
-                // during stopping then access to the cache is synchronized
-                synchronized (cacheLock) {
-                    cache.putIfAbsent(s, s);
-                }
-            } else {
-                // optimize for normal operation
-                cache.putIfAbsent(s, s);
-            }
+            cache.putIfAbsent(s, s);
         }
         return s;
     }
@@ -182,10 +172,8 @@ abstract class ServicePool<S extends Service> extends 
ServiceSupport implements
         pool.values().forEach(Pool::stop);
         pool.clear();
         if (cache != null) {
-            synchronized (cacheLock) {
-                cache.values().forEach(ServicePool::stop);
-                cache.clear();
-            }
+            cache.values().forEach(ServicePool::stop);
+            cache.clear();
         }
         singlePoolEvicted.values().forEach(Pool::stop);
         singlePoolEvicted.clear();
@@ -295,29 +283,19 @@ abstract class ServicePool<S extends Service> extends 
ServiceSupport implements
      * thread at any given time.
      */
     private class MultiplePool implements Pool<S> {
-        private final Object lock = new Object();
         private final Endpoint endpoint;
         private final BlockingQueue<S> queue;
-        private final List<S> evicts;
+        private final Deque<S> evicts;
 
         MultiplePool(Endpoint endpoint) {
             this.endpoint = endpoint;
             this.queue = new ArrayBlockingQueue<>(capacity);
-            this.evicts = new ArrayList<>();
+            this.evicts = new ConcurrentLinkedDeque<>();
         }
 
         private void cleanupEvicts() {
-            if (!evicts.isEmpty()) {
-                synchronized (lock) {
-                    if (!evicts.isEmpty()) {
-                        for (S evict : evicts) {
-                            queue.remove(evict);
-                            // stop the service after having removed it from 
queue
-                            doStop(evict);
-                        }
-                        evicts.clear();
-                    }
-                }
+            for (S evict = evicts.pollFirst(); evict != null; evict = 
evicts.pollFirst()) {
+                doStop(evict);
             }
         }
 
@@ -325,13 +303,10 @@ abstract class ServicePool<S extends Service> extends 
ServiceSupport implements
         public S acquire() throws Exception {
             cleanupEvicts();
 
-            S s;
-            synchronized (lock) {
-                s = queue.poll();
-                if (s == null) {
-                    s = creator.apply(endpoint);
-                    s.start();
-                }
+            S s = queue.poll();
+            if (s == null) {
+                s = creator.apply(endpoint);
+                s.start();
             }
             return s;
         }
@@ -340,11 +315,9 @@ abstract class ServicePool<S extends Service> extends 
ServiceSupport implements
         public void release(S s) {
             cleanupEvicts();
 
-            synchronized (lock) {
-                if (!queue.offer(s)) {
-                    // there is no room so lets just stop and discard this
-                    doStop(s);
-                }
+            if (!queue.offer(s)) {
+                // there is no room so let's just stop and discard this
+                doStop(s);
             }
         }
 
@@ -355,19 +328,16 @@ abstract class ServicePool<S extends Service> extends 
ServiceSupport implements
 
         @Override
         public void stop() {
-            synchronized (lock) {
-                queue.forEach(this::doStop);
-                queue.clear();
-                pool.remove(endpoint);
-            }
+            ArrayList<S> list = new ArrayList<>();
+            queue.drainTo(list);
+            pool.remove(endpoint);
+            list.forEach(this::doStop);
         }
 
         @Override
         public void evict(S s) {
             // to be evicted
-            synchronized (lock) {
-                evicts.add(s);
-            }
+            evicts.add(s);
         }
 
         @Override

Reply via email to