This is an automated email from the ASF dual-hosted git repository.
nfilotto pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new cc4cca5cb51 CAMEL-20199: Remove synchronized block from components N
to Q (#16135)
cc4cca5cb51 is described below
commit cc4cca5cb51c3c7da2ff9aca54c30b45848e6513
Author: Nicolas Filotto <[email protected]>
AuthorDate: Thu Oct 31 18:36:22 2024 +0100
CAMEL-20199: Remove synchronized block from components N to Q (#16135)
## Motivation
For better support of virtual threads, we need to avoid lengthy and
frequent pinning by replacing synchronized blocks with ReentrantLocks
## Modifications:
* Replace mutex with locks
* Use locks instead of synchronized blocks
* Use ConcurrentHashMap instead of HashMap when possible
---
.../pinecone/PineconeVectorDbEndpoint.java | 15 ++++---
.../camel/component/qdrant/QdrantEndpoint.java | 11 +++--
.../component/netty/http/NettyHttpComponent.java | 17 +++++---
.../component/netty/SharedSingletonObjectPool.java | 31 +++++++++-----
.../camel/component/olingo2/Olingo2AppWrapper.java | 9 ++++-
.../camel/component/olingo2/Olingo2Component.java | 5 ++-
.../camel/component/olingo2/Olingo2Endpoint.java | 9 ++++-
.../olingo2/internal/Olingo2PropertiesHelper.java | 17 ++++++--
.../camel/component/olingo4/Olingo4AppWrapper.java | 9 ++++-
.../camel/component/olingo4/Olingo4Component.java | 5 ++-
.../camel/component/olingo4/Olingo4Endpoint.java | 9 ++++-
.../olingo4/internal/Olingo4PropertiesHelper.java | 17 ++++++--
.../component/opensearch/OpensearchProducer.java | 6 ++-
.../component/optaplanner/OptaPlannerEndpoint.java | 47 +++++++++-------------
.../platform/http/PlatformHttpComponent.java | 6 ++-
.../camel/component/printer/PrintDocument.java | 8 +++-
.../camel/component/pulsar/PulsarProducer.java | 6 ++-
.../component/quickfixj/QuickfixjComponent.java | 16 ++++++--
.../component/quickfixj/QuickfixjEndpoint.java | 8 +++-
19 files changed, 165 insertions(+), 86 deletions(-)
diff --git
a/components/camel-ai/camel-pinecone/src/main/java/org/apache/camel/component/pinecone/PineconeVectorDbEndpoint.java
b/components/camel-ai/camel-pinecone/src/main/java/org/apache/camel/component/pinecone/PineconeVectorDbEndpoint.java
index 5a648f13abb..00b663e7e45 100644
---
a/components/camel-ai/camel-pinecone/src/main/java/org/apache/camel/component/pinecone/PineconeVectorDbEndpoint.java
+++
b/components/camel-ai/camel-pinecone/src/main/java/org/apache/camel/component/pinecone/PineconeVectorDbEndpoint.java
@@ -73,14 +73,19 @@ public class PineconeVectorDbEndpoint extends
DefaultEndpoint {
return collection;
}
- public synchronized Pinecone getClient() {
- if (this.client == null) {
- this.client = this.configuration.getClient();
+ public Pinecone getClient() {
+ lock.lock();
+ try {
if (this.client == null) {
- this.client = createClient();
+ this.client = this.configuration.getClient();
+ if (this.client == null) {
+ this.client = createClient();
+ }
}
+ return this.client;
+ } finally {
+ lock.unlock();
}
- return this.client;
}
@Override
diff --git
a/components/camel-ai/camel-qdrant/src/main/java/org/apache/camel/component/qdrant/QdrantEndpoint.java
b/components/camel-ai/camel-qdrant/src/main/java/org/apache/camel/component/qdrant/QdrantEndpoint.java
index 50b90bf24bc..593885a0fea 100644
---
a/components/camel-ai/camel-qdrant/src/main/java/org/apache/camel/component/qdrant/QdrantEndpoint.java
+++
b/components/camel-ai/camel-qdrant/src/main/java/org/apache/camel/component/qdrant/QdrantEndpoint.java
@@ -55,8 +55,6 @@ public class QdrantEndpoint extends DefaultEndpoint
implements EndpointServiceLo
@UriParam
private QdrantConfiguration configuration;
- private final Object lock;
-
private volatile boolean closeClient;
private volatile QdrantClient client;
@@ -70,8 +68,6 @@ public class QdrantEndpoint extends DefaultEndpoint
implements EndpointServiceLo
this.collection = collection;
this.configuration = configuration;
-
- this.lock = new Object();
}
@Override
@@ -92,9 +88,10 @@ public class QdrantEndpoint extends DefaultEndpoint
implements EndpointServiceLo
return collection;
}
- public synchronized QdrantClient getClient() {
+ public QdrantClient getClient() {
if (this.client == null) {
- synchronized (this.lock) {
+ lock.lock();
+ try {
if (this.client == null) {
this.client = this.configuration.getClient();
this.closeClient = false;
@@ -104,6 +101,8 @@ public class QdrantEndpoint extends DefaultEndpoint
implements EndpointServiceLo
this.closeClient = true;
}
}
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpComponent.java
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpComponent.java
index 1b071b2b280..2db0bc72d3d 100644
---
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpComponent.java
+++
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpComponent.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.netty.http;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
@@ -64,8 +65,8 @@ public class NettyHttpComponent extends NettyComponent
private static final Logger LOG =
LoggerFactory.getLogger(NettyHttpComponent.class);
// factories which is created by this component and therefore manage their
lifecycles
- private final Map<Integer, HttpServerConsumerChannelFactory>
multiplexChannelHandlers = new HashMap<>();
- private final Map<String, HttpServerBootstrapFactory> bootstrapFactories =
new HashMap<>();
+ private final Map<Integer, HttpServerConsumerChannelFactory>
multiplexChannelHandlers = new ConcurrentHashMap<>();
+ private final Map<String, HttpServerBootstrapFactory> bootstrapFactories =
new ConcurrentHashMap<>();
@Metadata(label = "advanced")
private NettyHttpBinding nettyHttpBinding;
@Metadata(label = "advanced")
@@ -329,7 +330,7 @@ public class NettyHttpComponent extends NettyComponent
this.muteException = muteException;
}
- public synchronized HttpServerConsumerChannelFactory
getMultiplexChannelHandler(int port) {
+ public HttpServerConsumerChannelFactory getMultiplexChannelHandler(int
port) {
return multiplexChannelHandlers.computeIfAbsent(port, s ->
newHttpServerConsumerChannelFactory(port));
}
@@ -339,8 +340,14 @@ public class NettyHttpComponent extends NettyComponent
return answer;
}
- protected synchronized HttpServerBootstrapFactory
getOrCreateHttpNettyServerBootstrapFactory(NettyHttpConsumer consumer) {
- String key = consumer.getConfiguration().getAddress();
+ protected HttpServerBootstrapFactory
getOrCreateHttpNettyServerBootstrapFactory(NettyHttpConsumer consumer) {
+ String key;
+ lock.lock();
+ try {
+ key = consumer.getConfiguration().getAddress();
+ } finally {
+ lock.unlock();
+ }
return bootstrapFactories.computeIfAbsent(key, s ->
newHttpServerBootstrapFactory(consumer));
}
diff --git
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SharedSingletonObjectPool.java
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SharedSingletonObjectPool.java
index c04330c440f..1922aa7eaa0 100644
---
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SharedSingletonObjectPool.java
+++
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SharedSingletonObjectPool.java
@@ -16,6 +16,9 @@
*/
package org.apache.camel.component.netty;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
@@ -31,6 +34,7 @@ import org.slf4j.LoggerFactory;
public class SharedSingletonObjectPool<T> implements ObjectPool<T> {
private static final Logger LOG =
LoggerFactory.getLogger(SharedSingletonObjectPool.class);
+ private final Lock lock = new ReentrantLock();
private final PooledObjectFactory<T> factory;
private volatile PooledObject<T> t;
@@ -44,19 +48,24 @@ public class SharedSingletonObjectPool<T> implements
ObjectPool<T> {
}
@Override
- public synchronized T borrowObject() throws Exception {
- if (t != null) {
- // ensure the object is validated before we borrow it
- if (!factory.validateObject(t)) {
- invalidateObject(t.getObject());
- LOG.info("Recreating new connection as current connection is
invalid: {}", t);
- t = null;
+ public T borrowObject() throws Exception {
+ lock.lock();
+ try {
+ if (t != null) {
+ // ensure the object is validated before we borrow it
+ if (!factory.validateObject(t)) {
+ invalidateObject(t.getObject());
+ LOG.info("Recreating new connection as current connection
is invalid: {}", t);
+ t = null;
+ }
}
+ if (t == null) {
+ t = factory.makeObject();
+ }
+ return t.getObject();
+ } finally {
+ lock.unlock();
}
- if (t == null) {
- t = factory.makeObject();
- }
- return t.getObject();
}
@Override
diff --git
a/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2AppWrapper.java
b/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2AppWrapper.java
index 53f9a0e35fe..093e78c0750 100644
---
a/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2AppWrapper.java
+++
b/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2AppWrapper.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.olingo2;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.olingo2.api.Olingo2App;
@@ -31,6 +33,7 @@ import org.apache.olingo.odata2.api.edm.Edm;
*/
public class Olingo2AppWrapper {
+ private final Lock lock = new ReentrantLock();
private final Olingo2App olingo2App;
private volatile Edm edm;
@@ -51,8 +54,8 @@ public class Olingo2AppWrapper {
public Edm getEdm(Map<String, String> endpointHttpHeaders) throws
RuntimeCamelException {
Edm localEdm = edm;
if (localEdm == null) {
-
- synchronized (this) {
+ lock.lock();
+ try {
localEdm = edm;
if (localEdm == null) {
@@ -101,6 +104,8 @@ public class Olingo2AppWrapper {
localEdm = edm;
}
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Component.java
b/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Component.java
index 908cc19f591..5109c914533 100644
---
a/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Component.java
+++
b/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Component.java
@@ -122,10 +122,13 @@ public class Olingo2Component extends
AbstractApiComponent<Olingo2ApiName, Oling
public Olingo2AppWrapper createApiProxy(Olingo2Configuration
endpointConfiguration) {
final Olingo2AppWrapper result;
if (endpointConfiguration.equals(getConfiguration())) {
- synchronized (this) {
+ lock.lock();
+ try {
if (apiProxy == null) {
apiProxy = createOlingo2App(getConfiguration());
}
+ } finally {
+ lock.unlock();
}
result = apiProxy;
} else {
diff --git
a/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Endpoint.java
b/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Endpoint.java
index 21ddf1e3164..b654051af0f 100644
---
a/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Endpoint.java
+++
b/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Endpoint.java
@@ -195,8 +195,13 @@ public class Olingo2Endpoint extends
AbstractApiEndpoint<Olingo2ApiName, Olingo2
}
@Override
- public synchronized Object getApiProxy(ApiMethod method, Map<String,
Object> args) {
- return apiProxy.getOlingo2App();
+ public Object getApiProxy(ApiMethod method, Map<String, Object> args) {
+ lock.lock();
+ try {
+ return apiProxy.getOlingo2App();
+ } finally {
+ lock.unlock();
+ }
}
@Override
diff --git
a/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/internal/Olingo2PropertiesHelper.java
b/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/internal/Olingo2PropertiesHelper.java
index 64984d6a33e..62e01ab9fde 100644
---
a/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/internal/Olingo2PropertiesHelper.java
+++
b/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/internal/Olingo2PropertiesHelper.java
@@ -16,6 +16,9 @@
*/
package org.apache.camel.component.olingo2.internal;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.camel.CamelContext;
import org.apache.camel.component.olingo2.Olingo2Configuration;
import org.apache.camel.support.component.ApiMethodPropertiesHelper;
@@ -25,16 +28,22 @@ import
org.apache.camel.support.component.ApiMethodPropertiesHelper;
*/
public final class Olingo2PropertiesHelper extends
ApiMethodPropertiesHelper<Olingo2Configuration> {
+ private static final Lock LOCK = new ReentrantLock();
private static Olingo2PropertiesHelper helper;
private Olingo2PropertiesHelper(CamelContext context) {
super(context, Olingo2Configuration.class,
Olingo2Constants.PROPERTY_PREFIX);
}
- public static synchronized Olingo2PropertiesHelper getHelper(CamelContext
context) {
- if (helper == null) {
- helper = new Olingo2PropertiesHelper(context);
+ public static Olingo2PropertiesHelper getHelper(CamelContext context) {
+ LOCK.lock();
+ try {
+ if (helper == null) {
+ helper = new Olingo2PropertiesHelper(context);
+ }
+ return helper;
+ } finally {
+ LOCK.unlock();
}
- return helper;
}
}
diff --git
a/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4AppWrapper.java
b/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4AppWrapper.java
index f65f025357a..d070b951caf 100644
---
a/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4AppWrapper.java
+++
b/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4AppWrapper.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.olingo4;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.olingo4.api.Olingo4App;
@@ -32,6 +34,7 @@ import org.apache.olingo.commons.api.edm.Edm;
*/
public class Olingo4AppWrapper {
+ private final Lock lock = new ReentrantLock();
private final Olingo4App olingo4App;
private volatile Edm edm;
@@ -52,8 +55,8 @@ public class Olingo4AppWrapper {
public Edm getEdm(Map<String, String> endpointHttpHeaders) throws
RuntimeCamelException {
Edm localEdm = edm;
if (localEdm == null) {
-
- synchronized (this) {
+ lock.lock();
+ try {
localEdm = edm;
if (localEdm == null) {
@@ -102,6 +105,8 @@ public class Olingo4AppWrapper {
localEdm = edm;
}
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Component.java
b/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Component.java
index 5263ee7522a..ffeb4bd2340 100644
---
a/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Component.java
+++
b/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Component.java
@@ -122,10 +122,13 @@ public class Olingo4Component extends
AbstractApiComponent<Olingo4ApiName, Oling
public Olingo4AppWrapper createApiProxy(Olingo4Configuration
endpointConfiguration) {
final Olingo4AppWrapper result;
if (endpointConfiguration.equals(getConfiguration())) {
- synchronized (this) {
+ lock.lock();
+ try {
if (apiProxy == null) {
apiProxy = createOlingo4App(getConfiguration());
}
+ } finally {
+ lock.unlock();
}
result = apiProxy;
} else {
diff --git
a/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Endpoint.java
b/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Endpoint.java
index acfd87bc504..fa2a41fb9f8 100644
---
a/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Endpoint.java
+++
b/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Endpoint.java
@@ -195,8 +195,13 @@ public class Olingo4Endpoint extends
AbstractApiEndpoint<Olingo4ApiName, Olingo4
}
@Override
- public synchronized Object getApiProxy(ApiMethod method, Map<String,
Object> args) {
- return apiProxy.getOlingo4App();
+ public Object getApiProxy(ApiMethod method, Map<String, Object> args) {
+ lock.lock();
+ try {
+ return apiProxy.getOlingo4App();
+ } finally {
+ lock.unlock();
+ }
}
@Override
diff --git
a/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/internal/Olingo4PropertiesHelper.java
b/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/internal/Olingo4PropertiesHelper.java
index 85fd20a74ad..66464745444 100644
---
a/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/internal/Olingo4PropertiesHelper.java
+++
b/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/internal/Olingo4PropertiesHelper.java
@@ -16,6 +16,9 @@
*/
package org.apache.camel.component.olingo4.internal;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.camel.CamelContext;
import org.apache.camel.component.olingo4.Olingo4Configuration;
import org.apache.camel.support.component.ApiMethodPropertiesHelper;
@@ -25,16 +28,22 @@ import
org.apache.camel.support.component.ApiMethodPropertiesHelper;
*/
public final class Olingo4PropertiesHelper extends
ApiMethodPropertiesHelper<Olingo4Configuration> {
+ private static final Lock LOCK = new ReentrantLock();
private static Olingo4PropertiesHelper helper;
private Olingo4PropertiesHelper(CamelContext context) {
super(context, Olingo4Configuration.class,
Olingo4Constants.PROPERTY_PREFIX);
}
- public static synchronized Olingo4PropertiesHelper getHelper(CamelContext
context) {
- if (helper == null) {
- helper = new Olingo4PropertiesHelper(context);
+ public static Olingo4PropertiesHelper getHelper(CamelContext context) {
+ LOCK.lock();
+ try {
+ if (helper == null) {
+ helper = new Olingo4PropertiesHelper(context);
+ }
+ return helper;
+ } finally {
+ LOCK.unlock();
}
- return helper;
}
}
diff --git
a/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchProducer.java
b/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchProducer.java
index e95a02e133a..ac95285a176 100644
---
a/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchProducer.java
+++
b/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchProducer.java
@@ -83,7 +83,6 @@ class OpensearchProducer extends DefaultAsyncProducer {
private static final Logger LOG =
LoggerFactory.getLogger(OpensearchProducer.class);
protected final OpensearchConfiguration configuration;
- private final Object mutex = new Object();
private volatile RestClient client;
private Sniffer sniffer;
@@ -452,7 +451,8 @@ class OpensearchProducer extends DefaultAsyncProducer {
private void startClient() {
if (client == null) {
- synchronized (mutex) {
+ lock.lock();
+ try {
if (client == null) {
LOG.info("Connecting to the OpenSearch cluster: {}",
configuration.getClusterName());
if
(ObjectHelper.isNotEmpty(configuration.getHostAddressesList())
@@ -462,6 +462,8 @@ class OpensearchProducer extends DefaultAsyncProducer {
LOG.warn("Incorrect ip address and port parameters
settings for OpenSearch cluster");
}
}
+ } finally {
+ lock.unlock();
}
}
}
diff --git
a/components/camel-optaplanner/src/main/java/org/apache/camel/component/optaplanner/OptaPlannerEndpoint.java
b/components/camel-optaplanner/src/main/java/org/apache/camel/component/optaplanner/OptaPlannerEndpoint.java
index 91651daabb2..bd6937816d0 100644
---
a/components/camel-optaplanner/src/main/java/org/apache/camel/component/optaplanner/OptaPlannerEndpoint.java
+++
b/components/camel-optaplanner/src/main/java/org/apache/camel/component/optaplanner/OptaPlannerEndpoint.java
@@ -16,10 +16,10 @@
*/
package org.apache.camel.component.optaplanner;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.camel.Category;
import org.apache.camel.Component;
@@ -38,8 +38,8 @@ import org.optaplanner.core.api.solver.SolverFactory;
@UriEndpoint(firstVersion = "2.13.0", scheme = "optaplanner", title =
"OptaPlanner", syntax = "optaplanner:problemName",
category = { Category.WORKFLOW }, headersClass =
OptaPlannerConstants.class)
public class OptaPlannerEndpoint extends DefaultEndpoint {
- private static final Map<String, Solver<Object>> SOLVERS = new HashMap<>();
- private static final Map<Long, Set<OptaplannerSolutionEventListener>>
SOLUTION_LISTENER = new HashMap<>();
+ private static final Map<String, Solver<Object>> SOLVERS = new
ConcurrentHashMap<>();
+ private static final Map<Long, Set<OptaplannerSolutionEventListener>>
SOLUTION_LISTENER = new ConcurrentHashMap<>();
@UriParam
private OptaPlannerConfiguration configuration;
@@ -54,9 +54,7 @@ public class OptaPlannerEndpoint extends DefaultEndpoint {
}
protected Solver<Object> getOrCreateSolver(String solverId) {
- synchronized (SOLVERS) {
- return SOLVERS.computeIfAbsent(solverId, k -> createSolver());
- }
+ return SOLVERS.computeIfAbsent(solverId, k -> createSolver());
}
protected Solver<Object> createSolver() {
@@ -66,9 +64,7 @@ public class OptaPlannerEndpoint extends DefaultEndpoint {
}
protected Solver<Object> getSolver(String solverId) {
- synchronized (SOLVERS) {
- return SOLVERS.get(solverId);
- }
+ return SOLVERS.get(solverId);
}
@Override
@@ -85,11 +81,9 @@ public class OptaPlannerEndpoint extends DefaultEndpoint {
@Override
protected void doStop() throws Exception {
- synchronized (SOLVERS) {
- for (Map.Entry<String, Solver<Object>> solver :
SOLVERS.entrySet()) {
- solver.getValue().terminateEarly();
- SOLVERS.remove(solver.getKey());
- }
+ for (Map.Entry<String, Solver<Object>> solver : SOLVERS.entrySet()) {
+ solver.getValue().terminateEarly();
+ SOLVERS.remove(solver.getKey());
}
super.doStop();
}
@@ -98,22 +92,17 @@ public class OptaPlannerEndpoint extends DefaultEndpoint {
return SOLUTION_LISTENER.get(problemId);
}
- protected synchronized void addSolutionEventListener(Long problemId,
OptaplannerSolutionEventListener listener) {
- Set<OptaplannerSolutionEventListener> listeners =
SOLUTION_LISTENER.get(problemId);
- if (listeners == null) {
- listeners = new HashSet<>();
- listeners.add(listener);
- SOLUTION_LISTENER.put(problemId, listeners);
- } else {
- listeners.add(listener);
- }
+ protected void addSolutionEventListener(Long problemId,
OptaplannerSolutionEventListener listener) {
+ SOLUTION_LISTENER.computeIfAbsent(problemId, k -> new
HashSet<>()).add(listener);
}
- protected synchronized void removeSolutionEventListener(Long problemId,
OptaplannerSolutionEventListener listener) {
- Set<OptaplannerSolutionEventListener> listeners =
SOLUTION_LISTENER.get(problemId);
- listeners.remove(listener);
- if (listeners.isEmpty()) {
- SOLUTION_LISTENER.remove(problemId);
- }
+ protected void removeSolutionEventListener(Long problemId,
OptaplannerSolutionEventListener listener) {
+ SOLUTION_LISTENER.computeIfPresent(problemId, (k, listeners) -> {
+ listeners.remove(listener);
+ if (listeners.isEmpty()) {
+ return null;
+ }
+ return listeners;
+ });
}
}
diff --git
a/components/camel-platform-http/src/main/java/org/apache/camel/component/platform/http/PlatformHttpComponent.java
b/components/camel-platform-http/src/main/java/org/apache/camel/component/platform/http/PlatformHttpComponent.java
index 7872688d9a0..ef7b49eaac8 100644
---
a/components/camel-platform-http/src/main/java/org/apache/camel/component/platform/http/PlatformHttpComponent.java
+++
b/components/camel-platform-http/src/main/java/org/apache/camel/component/platform/http/PlatformHttpComponent.java
@@ -65,7 +65,6 @@ public class PlatformHttpComponent extends
HeaderFilterStrategyComponent
private final Set<HttpEndpointModel> httpEndpoints = new TreeSet<>();
private final List<PlatformHttpListener> listeners = new ArrayList<>();
private volatile boolean localEngine;
- private final Object lock = new Object();
public PlatformHttpComponent() {
this(null);
@@ -270,7 +269,8 @@ public class PlatformHttpComponent extends
HeaderFilterStrategyComponent
PlatformHttpEngine getOrCreateEngine() {
if (engine == null) {
- synchronized (lock) {
+ lock.lock();
+ try {
if (engine == null) {
LOG.debug("Lookup platform http engine from registry");
@@ -290,6 +290,8 @@ public class PlatformHttpComponent extends
HeaderFilterStrategyComponent
localEngine = true;
}
}
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-printer/src/main/java/org/apache/camel/component/printer/PrintDocument.java
b/components/camel-printer/src/main/java/org/apache/camel/component/printer/PrintDocument.java
index a59d948453d..b959bf10b0a 100644
---
a/components/camel-printer/src/main/java/org/apache/camel/component/printer/PrintDocument.java
+++
b/components/camel-printer/src/main/java/org/apache/camel/component/printer/PrintDocument.java
@@ -21,12 +21,15 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.io.StringReader;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import javax.print.Doc;
import javax.print.DocFlavor;
import javax.print.attribute.DocAttributeSet;
class PrintDocument implements Doc {
+ private final Lock lock = new ReentrantLock();
private DocFlavor docFlavor;
private InputStream stream;
private Reader reader;
@@ -54,7 +57,8 @@ class PrintDocument implements Doc {
@Override
public Reader getReaderForText() throws IOException {
- synchronized (this) {
+ lock.lock();
+ try {
if (reader != null) {
return reader;
}
@@ -75,6 +79,8 @@ class PrintDocument implements Doc {
}
return reader;
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index becd99a162f..00b095266c0 100644
---
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -39,7 +39,6 @@ public class PulsarProducer extends DefaultAsyncProducer {
private static final Logger LOG =
LoggerFactory.getLogger(PulsarProducer.class);
- private final Object mutex = new Object();
private final PulsarEndpoint pulsarEndpoint;
private volatile Producer<byte[]> producer;
@@ -120,7 +119,8 @@ public class PulsarProducer extends DefaultAsyncProducer {
}
private void createProducer() throws PulsarClientException {
- synchronized (mutex) {
+ lock.lock();
+ try {
if (producer == null) {
final String topicUri = pulsarEndpoint.getUri();
PulsarConfiguration configuration =
pulsarEndpoint.getPulsarConfiguration();
@@ -147,6 +147,8 @@ public class PulsarProducer extends DefaultAsyncProducer {
}
producer = producerBuilder.create();
}
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
index a2e1a91a6d7..d9fdc0b4179 100644
---
a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
+++
b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
@@ -41,7 +41,6 @@ public class QuickfixjComponent extends DefaultComponent
implements StartupListe
private static final String PARAMETER_LAZY_CREATE_ENGINE =
"lazyCreateEngine";
- private final Object engineInstancesLock = new Object();
private final Map<String, QuickfixjEngine> engines = new HashMap<>();
private final Map<String, QuickfixjEngine> provisionalEngines = new
HashMap<>();
private final Map<String, QuickfixjEndpoint> endpoints = new HashMap<>();
@@ -70,7 +69,8 @@ public class QuickfixjComponent extends DefaultComponent
implements StartupListe
protected Endpoint createEndpoint(String uri, String remaining,
Map<String, Object> parameters) throws Exception {
// Look up the engine instance based on the settings file ("remaining")
QuickfixjEngine engine;
- synchronized (engineInstancesLock) {
+ lock.lock();
+ try {
QuickfixjEndpoint endpoint = endpoints.get(uri);
if (endpoint == null) {
@@ -114,16 +114,21 @@ public class QuickfixjComponent extends DefaultComponent
implements StartupListe
}
return endpoint;
+ } finally {
+ lock.unlock();
}
}
@Override
protected void doStop() throws Exception {
// stop engines when stopping component
- synchronized (engineInstancesLock) {
+ lock.lock();
+ try {
for (QuickfixjEngine engine : engines.values()) {
engine.stop();
}
+ } finally {
+ lock.unlock();
}
super.doStop();
}
@@ -230,7 +235,8 @@ public class QuickfixjComponent extends DefaultComponent
implements StartupListe
@Override
public void onCamelContextStarted(CamelContext camelContext, boolean
alreadyStarted) throws Exception {
// only start quickfix engines when CamelContext have finished starting
- synchronized (engineInstancesLock) {
+ lock.lock();
+ try {
for (QuickfixjEngine engine : engines.values()) {
startQuickfixjEngine(engine);
}
@@ -239,6 +245,8 @@ public class QuickfixjComponent extends DefaultComponent
implements StartupListe
engines.put(entry.getKey(), entry.getValue());
}
provisionalEngines.clear();
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
index 9553bcf8dba..e9913b16ca1 100644
---
a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
+++
b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.quickfixj;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.Category;
import org.apache.camel.Component;
@@ -56,6 +58,7 @@ public class QuickfixjEndpoint extends DefaultEndpoint
implements QuickfixjEvent
private static final Logger LOG =
LoggerFactory.getLogger(QuickfixjEndpoint.class);
private final QuickfixjEngine engine;
+ private final Lock engineLock = new ReentrantLock();
private final List<QuickfixjConsumer> consumers = new
CopyOnWriteArrayList<>();
@UriPath
@@ -229,11 +232,14 @@ public class QuickfixjEndpoint extends DefaultEndpoint
implements QuickfixjEvent
*/
public void ensureInitialized() throws Exception {
if (!engine.isInitialized()) {
- synchronized (engine) {
+ engineLock.lock();
+ try {
if (!engine.isInitialized()) {
engine.initializeEngine();
ServiceHelper.startService(engine);
}
+ } finally {
+ engineLock.unlock();
}
}
}