This is an automated email from the ASF dual-hosted git repository.
nfilotto pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 1e7bd122c8a CAMEL-20199: Remove synchronized blocks from components R
to S (#16288)
1e7bd122c8a is described below
commit 1e7bd122c8ac5e962b7a7878ce2a95cb8b16bfec
Author: Nicolas Filotto <[email protected]>
AuthorDate: Fri Nov 15 09:33:54 2024 +0100
CAMEL-20199: Remove synchronized blocks from components R to S (#16288)
## Motivation
For better support of virtual threads, we need to avoid lengthy and
frequent pinning by replacing synchronized blocks with ReentrantLocks
## Modifications:
* Replace mutex with locks
* Use locks instead of synchronized blocks
* Leverage ConcurrentMap methods to get rid of synchronized blocks
---
.../streams/ReactiveStreamsCamelSubscriber.java | 57 ++++++--
.../reactive/streams/ReactiveStreamsComponent.java | 45 ++++---
.../streams/engine/DelayedMonoPublisher.java | 32 ++++-
.../reactor/engine/ReactorCamelProcessor.java | 69 ++++++----
.../camel/component/rocketmq/RocketMQProducer.java | 5 +-
.../rxjava/engine/RxJavaCamelProcessor.java | 69 ++++++----
.../salesforce/internal/SalesforceSession.java | 136 ++++++++++---------
.../internal/streaming/SubscriptionHelper.java | 86 ++++++------
.../component/scheduler/SchedulerComponent.java | 66 ++++-----
.../camel/component/seda/QueueReference.java | 55 +++++---
.../apache/camel/component/seda/SedaComponent.java | 107 ++++++++-------
.../apache/camel/component/seda/SedaEndpoint.java | 123 +++++++++--------
.../component/servicenow/auth/OAuthToken.java | 102 +++++++-------
.../apache/camel/component/sjms/SjmsComponent.java | 19 ++-
.../apache/camel/component/sjms/SjmsProducer.java | 5 +-
.../sjms/consumer/EndpointMessageListener.java | 17 ++-
.../consumer/SimpleMessageListenerContainer.java | 41 ++++--
.../sjms/reply/MessageSelectorCreator.java | 52 +++++---
.../component/sjms/reply/QueueReplyManager.java | 5 +-
.../component/splunk/SplunkConnectionFactory.java | 148 +++++++++++----------
.../camel/component/splunk/SplunkEndpoint.java | 21 +--
.../component/splunk/support/SplunkDataWriter.java | 30 +++--
.../component/splunk/support/SubmitDataWriter.java | 19 ++-
.../springrabbit/EndpointMessageListener.java | 16 ++-
.../SpringSecurityAuthorizationPolicy.java | 24 ++--
.../spring/ws/SpringWebserviceProducer.java | 12 +-
.../camel/component/event/EventEndpoint.java | 22 ++-
.../spring/spi/TransactionErrorHandlerReifier.java | 43 +++---
.../component/stax/StAXJAXBIteratorExpression.java | 19 +--
.../camel/component/stream/StreamConsumer.java | 62 +++++----
.../camel/component/stream/StreamProducer.java | 6 +-
31 files changed, 900 insertions(+), 613 deletions(-)
diff --git
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java
index ebf827f47ab..444fb42ef67 100644
---
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java
+++
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.reactive.streams;
import java.io.Closeable;
import java.io.IOException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.Exchange;
import org.reactivestreams.Subscriber;
@@ -37,6 +39,7 @@ public class ReactiveStreamsCamelSubscriber implements
Subscriber<Exchange>, Clo
*/
private static final long UNBOUNDED_REQUESTS = Long.MAX_VALUE;
+ private final Lock lock = new ReentrantLock();
private final String name;
private ReactiveStreamsConsumer consumer;
@@ -52,22 +55,33 @@ public class ReactiveStreamsCamelSubscriber implements
Subscriber<Exchange>, Clo
}
public void attachConsumer(ReactiveStreamsConsumer consumer) {
- synchronized (this) {
+ lock.lock();
+ try {
if (this.consumer != null) {
throw new IllegalStateException("A consumer is already
attached to the stream '" + name + "'");
}
this.consumer = consumer;
+ } finally {
+ lock.unlock();
}
refill();
}
- public synchronized ReactiveStreamsConsumer getConsumer() {
- return consumer;
+ public ReactiveStreamsConsumer getConsumer() {
+ lock.lock();
+ try {
+ return consumer;
+ } finally {
+ lock.unlock();
+ }
}
public void detachConsumer() {
- synchronized (this) {
+ lock.lock();
+ try {
this.consumer = null;
+ } finally {
+ lock.unlock();
}
}
@@ -78,12 +92,15 @@ public class ReactiveStreamsCamelSubscriber implements
Subscriber<Exchange>, Clo
}
boolean allowed = true;
- synchronized (this) {
+ lock.lock();
+ try {
if (this.subscription != null) {
allowed = false;
} else {
this.subscription = subscription;
}
+ } finally {
+ lock.unlock();
}
if (!allowed) {
@@ -101,7 +118,8 @@ public class ReactiveStreamsCamelSubscriber implements
Subscriber<Exchange>, Clo
}
ReactiveStreamsConsumer target;
- synchronized (this) {
+ lock.lock();
+ try {
if (requested < UNBOUNDED_REQUESTS) {
// When there are UNBOUNDED_REQUESTS, they remain constant
requested--;
@@ -110,12 +128,17 @@ public class ReactiveStreamsCamelSubscriber implements
Subscriber<Exchange>, Clo
if (target != null) {
inflightCount++;
}
+ } finally {
+ lock.unlock();
}
if (target != null) {
target.process(exchange, doneSync -> {
- synchronized (this) {
+ lock.lock();
+ try {
inflightCount--;
+ } finally {
+ lock.unlock();
}
refill();
@@ -129,7 +152,8 @@ public class ReactiveStreamsCamelSubscriber implements
Subscriber<Exchange>, Clo
protected void refill() {
Long toBeRequested = null;
Subscription subs = null;
- synchronized (this) {
+ lock.lock();
+ try {
if (consumer != null && this.subscription != null) {
Integer consMax =
consumer.getEndpoint().getMaxInflightExchanges();
long max = (consMax != null && consMax > 0) ?
consMax.longValue() : UNBOUNDED_REQUESTS;
@@ -144,6 +168,8 @@ public class ReactiveStreamsCamelSubscriber implements
Subscriber<Exchange>, Clo
}
}
}
+ } finally {
+ lock.unlock();
}
if (toBeRequested != null) {
@@ -160,9 +186,12 @@ public class ReactiveStreamsCamelSubscriber implements
Subscriber<Exchange>, Clo
LOG.error("Error in reactive stream '{}'", name, throwable);
ReactiveStreamsConsumer consumer;
- synchronized (this) {
+ lock.lock();
+ try {
consumer = this.consumer;
this.subscription = null;
+ } finally {
+ lock.unlock();
}
if (consumer != null) {
@@ -176,9 +205,12 @@ public class ReactiveStreamsCamelSubscriber implements
Subscriber<Exchange>, Clo
LOG.info("Reactive stream '{}' completed", name);
ReactiveStreamsConsumer consumer;
- synchronized (this) {
+ lock.lock();
+ try {
consumer = this.consumer;
this.subscription = null;
+ } finally {
+ lock.unlock();
}
if (consumer != null) {
@@ -189,8 +221,11 @@ public class ReactiveStreamsCamelSubscriber implements
Subscriber<Exchange>, Clo
@Override
public void close() throws IOException {
Subscription subscription;
- synchronized (this) {
+ lock.lock();
+ try {
subscription = this.subscription;
+ } finally {
+ lock.unlock();
}
if (subscription != null) {
diff --git
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
index c4524e6028b..640b9d5f16d 100644
---
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
+++
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
@@ -161,29 +161,34 @@ public class ReactiveStreamsComponent extends
DefaultComponent {
*
* @return the reactive streams service
*/
- public synchronized CamelReactiveStreamsService
getReactiveStreamsService() {
- if (reactiveStreamsEngineConfiguration == null) {
- reactiveStreamsEngineConfiguration = new
ReactiveStreamsEngineConfiguration();
-
reactiveStreamsEngineConfiguration.setThreadPoolMaxSize(threadPoolMaxSize);
-
reactiveStreamsEngineConfiguration.setThreadPoolMinSize(threadPoolMinSize);
-
reactiveStreamsEngineConfiguration.setThreadPoolName(threadPoolName);
- }
+ public CamelReactiveStreamsService getReactiveStreamsService() {
+ lock.lock();
+ try {
+ if (reactiveStreamsEngineConfiguration == null) {
+ reactiveStreamsEngineConfiguration = new
ReactiveStreamsEngineConfiguration();
+
reactiveStreamsEngineConfiguration.setThreadPoolMaxSize(threadPoolMaxSize);
+
reactiveStreamsEngineConfiguration.setThreadPoolMinSize(threadPoolMinSize);
+
reactiveStreamsEngineConfiguration.setThreadPoolName(threadPoolName);
+ }
- if (service == null) {
- this.service = ReactiveStreamsHelper.resolveReactiveStreamsService(
- getCamelContext(),
- this.serviceType,
- this.reactiveStreamsEngineConfiguration);
-
- try {
- // Start the service and add it to the Camel context to expose
managed attributes
- getCamelContext().addService(service, true, true);
- } catch (Exception e) {
- throw new RuntimeCamelException(e);
+ if (service == null) {
+ this.service =
ReactiveStreamsHelper.resolveReactiveStreamsService(
+ getCamelContext(),
+ this.serviceType,
+ this.reactiveStreamsEngineConfiguration);
+
+ try {
+ // Start the service and add it to the Camel context to
expose managed attributes
+ getCamelContext().addService(service, true, true);
+ } catch (Exception e) {
+ throw new RuntimeCamelException(e);
+ }
}
- }
- return service;
+ return service;
+ } finally {
+ lock.unlock();
+ }
}
// ****************************************
diff --git
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
index dd8c4929c7d..e9615048c20 100644
---
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
+++
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
@@ -22,6 +22,8 @@ import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
@@ -126,6 +128,7 @@ public class DelayedMonoPublisher<T> implements
Publisher<T> {
private volatile boolean requested;
private final Subscriber<? super T> subscriber;
+ private final Lock lock = new ReentrantLock();
private MonoSubscription(Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
@@ -133,21 +136,30 @@ public class DelayedMonoPublisher<T> implements
Publisher<T> {
@Override
public void request(long l) {
- synchronized (this) {
+ lock.lock();
+ try {
if (terminated) {
// just ignore the request
return;
}
+ } finally {
+ lock.unlock();
}
if (l <= 0) {
subscriber.onError(new IllegalArgumentException("3.9"));
- synchronized (this) {
+ lock.lock();
+ try {
terminated = true;
+ } finally {
+ lock.unlock();
}
} else {
- synchronized (this) {
+ lock.lock();
+ try {
requested = true;
+ } finally {
+ lock.unlock();
}
}
@@ -155,12 +167,15 @@ public class DelayedMonoPublisher<T> implements
Publisher<T> {
}
public void flush() {
- synchronized (this) {
+ lock.lock();
+ try {
if (!isReady()) {
return;
}
terminated = true;
+ } finally {
+ lock.unlock();
}
if (data != null) {
@@ -180,8 +195,13 @@ public class DelayedMonoPublisher<T> implements
Publisher<T> {
}
@Override
- public synchronized void cancel() {
- terminated = true;
+ public void cancel() {
+ lock.lock();
+ try {
+ terminated = true;
+ } finally {
+ lock.unlock();
+ }
}
}
}
diff --git
a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorCamelProcessor.java
b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorCamelProcessor.java
index c33100a813d..47733ed920d 100644
---
a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorCamelProcessor.java
+++
b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorCamelProcessor.java
@@ -20,6 +20,8 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.Exchange;
import
org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy;
@@ -40,6 +42,7 @@ final class ReactorCamelProcessor implements Closeable {
private final AtomicReference<FluxSink<Exchange>> camelSink;
private final ReactorStreamsService service;
+ private final Lock lock = new ReentrantLock();
private ReactiveStreamsProducer camelProducer;
ReactorCamelProcessor(ReactorStreamsService service, String name) {
@@ -64,41 +67,51 @@ final class ReactorCamelProcessor implements Closeable {
return publisher;
}
- synchronized void attach(ReactiveStreamsProducer producer) {
- Objects.requireNonNull(producer, "producer cannot be null, use the
detach method");
+ void attach(ReactiveStreamsProducer producer) {
+ lock.lock();
+ try {
+ Objects.requireNonNull(producer, "producer cannot be null, use the
detach method");
- if (this.camelProducer != null) {
- throw new IllegalStateException("A producer is already attached to
the stream '" + name + "'");
- }
-
- if (this.camelProducer != producer) { // this condition is always true
- detach();
-
- ReactiveStreamsBackpressureStrategy strategy =
producer.getEndpoint().getBackpressureStrategy();
- Flux<Exchange> flux = Flux.create(camelSink::set,
FluxSink.OverflowStrategy.IGNORE);
-
- if (ObjectHelper.equal(strategy,
ReactiveStreamsBackpressureStrategy.OLDEST)) {
- // signal item emitted for non-dropped items only
- flux =
flux.onBackpressureDrop(this::onBackPressure).handle(this::onItemEmitted);
- } else if (ObjectHelper.equal(strategy,
ReactiveStreamsBackpressureStrategy.LATEST)) {
- // Since there is no callback for dropped elements on
backpressure "latest", item emission is signaled before dropping
- // No exception is reported back to the exchanges
- flux = flux.handle(this::onItemEmitted).onBackpressureLatest();
- } else {
- // Default strategy is BUFFER
- flux = flux.onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE,
this::onBackPressure).handle(this::onItemEmitted);
+ if (this.camelProducer != null) {
+ throw new IllegalStateException("A producer is already
attached to the stream '" + name + "'");
}
- flux.subscribe(this.publisher);
+ if (this.camelProducer != producer) { // this condition is always
true
+ detach();
+
+ ReactiveStreamsBackpressureStrategy strategy =
producer.getEndpoint().getBackpressureStrategy();
+ Flux<Exchange> flux = Flux.create(camelSink::set,
FluxSink.OverflowStrategy.IGNORE);
+
+ if (ObjectHelper.equal(strategy,
ReactiveStreamsBackpressureStrategy.OLDEST)) {
+ // signal item emitted for non-dropped items only
+ flux =
flux.onBackpressureDrop(this::onBackPressure).handle(this::onItemEmitted);
+ } else if (ObjectHelper.equal(strategy,
ReactiveStreamsBackpressureStrategy.LATEST)) {
+ // Since there is no callback for dropped elements on
backpressure "latest", item emission is signaled before dropping
+ // No exception is reported back to the exchanges
+ flux =
flux.handle(this::onItemEmitted).onBackpressureLatest();
+ } else {
+ // Default strategy is BUFFER
+ flux = flux.onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE,
this::onBackPressure)
+ .handle(this::onItemEmitted);
+ }
+
+ flux.subscribe(this.publisher);
- camelProducer = producer;
+ camelProducer = producer;
+ }
+ } finally {
+ lock.unlock();
}
}
- synchronized void detach() {
-
- this.camelProducer = null;
- this.camelSink.set(null);
+ void detach() {
+ lock.lock();
+ try {
+ this.camelProducer = null;
+ this.camelSink.set(null);
+ } finally {
+ lock.unlock();
+ }
}
void send(Exchange exchange) {
diff --git
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
index a07006f7695..defd5ea0ebd 100644
---
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
+++
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
@@ -136,7 +136,8 @@ public class RocketMQProducer extends DefaultAsyncProducer {
protected void initReplyManager() {
if (!started.get()) {
- synchronized (this) {
+ lock.lock();
+ try {
if (started.get()) {
return;
}
@@ -160,6 +161,8 @@ public class RocketMQProducer extends DefaultAsyncProducer {
}
}
started.set(true);
+ } finally {
+ lock.unlock();
}
}
}
diff --git
a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java
b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java
index 42a29c022c4..ef6135968b1 100644
---
a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java
+++
b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java
@@ -20,6 +20,8 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
@@ -39,6 +41,7 @@ final class RxJavaCamelProcessor implements Closeable {
private final RxJavaStreamsService service;
private final AtomicReference<FlowableEmitter<Exchange>> camelEmitter;
private final FlowableProcessor<Exchange> publisher;
+ private final Lock lock = new ReentrantLock();
private ReactiveStreamsProducer camelProducer;
RxJavaCamelProcessor(RxJavaStreamsService service, String name) {
@@ -58,40 +61,50 @@ final class RxJavaCamelProcessor implements Closeable {
return publisher;
}
- synchronized void attach(ReactiveStreamsProducer producer) {
- Objects.requireNonNull(producer, "producer cannot be null, use the
detach method");
+ void attach(ReactiveStreamsProducer producer) {
+ lock.lock();
+ try {
+ Objects.requireNonNull(producer, "producer cannot be null, use the
detach method");
- if (this.camelProducer != null) {
- throw new IllegalStateException("A producer is already attached to
the stream '" + name + "'");
- }
-
- if (this.camelProducer != producer) {
- detach();
-
- ReactiveStreamsBackpressureStrategy strategy =
producer.getEndpoint().getBackpressureStrategy();
- Flowable<Exchange> flow = Flowable.create(camelEmitter::set,
BackpressureStrategy.MISSING);
-
- if (ObjectHelper.equal(strategy,
ReactiveStreamsBackpressureStrategy.OLDEST)) {
- flow.onBackpressureDrop(this::onBackPressure)
- .doAfterNext(this::onItemEmitted)
- .subscribe(this.publisher);
- } else if (ObjectHelper.equal(strategy,
ReactiveStreamsBackpressureStrategy.LATEST)) {
- flow.doAfterNext(this::onItemEmitted)
- .onBackpressureLatest()
- .subscribe(this.publisher);
- } else {
- flow.doAfterNext(this::onItemEmitted)
- .onBackpressureBuffer()
- .subscribe(this.publisher);
+ if (this.camelProducer != null) {
+ throw new IllegalStateException("A producer is already
attached to the stream '" + name + "'");
}
- camelProducer = producer;
+ if (this.camelProducer != producer) {
+ detach();
+
+ ReactiveStreamsBackpressureStrategy strategy =
producer.getEndpoint().getBackpressureStrategy();
+ Flowable<Exchange> flow = Flowable.create(camelEmitter::set,
BackpressureStrategy.MISSING);
+
+ if (ObjectHelper.equal(strategy,
ReactiveStreamsBackpressureStrategy.OLDEST)) {
+ flow.onBackpressureDrop(this::onBackPressure)
+ .doAfterNext(this::onItemEmitted)
+ .subscribe(this.publisher);
+ } else if (ObjectHelper.equal(strategy,
ReactiveStreamsBackpressureStrategy.LATEST)) {
+ flow.doAfterNext(this::onItemEmitted)
+ .onBackpressureLatest()
+ .subscribe(this.publisher);
+ } else {
+ flow.doAfterNext(this::onItemEmitted)
+ .onBackpressureBuffer()
+ .subscribe(this.publisher);
+ }
+
+ camelProducer = producer;
+ }
+ } finally {
+ lock.unlock();
}
}
- synchronized void detach() {
- this.camelProducer = null;
- this.camelEmitter.set(null);
+ void detach() {
+ lock.lock();
+ try {
+ this.camelProducer = null;
+ this.camelEmitter.set(null);
+ } finally {
+ lock.unlock();
+ }
}
void send(Exchange exchange) {
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
index 60262a5b9c5..12c7c9dddaf 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
@@ -157,40 +157,44 @@ public class SalesforceSession extends ServiceSupport {
}
}
- public synchronized String login(String oldToken) throws
SalesforceException {
-
- // check if we need a new session
- // this way there's always a single valid session
- if (accessToken == null || accessToken.equals(oldToken)) {
+ public String login(String oldToken) throws SalesforceException {
+ lock.lock();
+ try {
+ // check if we need a new session
+ // this way there's always a single valid session
+ if (accessToken == null || accessToken.equals(oldToken)) {
- // try revoking the old access token before creating a new one
- accessToken = oldToken;
- if (accessToken != null) {
- try {
- logout();
- } catch (SalesforceException e) {
- LOG.warn("Error revoking old access token: {}",
e.getMessage(), e);
+ // try revoking the old access token before creating a new one
+ accessToken = oldToken;
+ if (accessToken != null) {
+ try {
+ logout();
+ } catch (SalesforceException e) {
+ LOG.warn("Error revoking old access token: {}",
e.getMessage(), e);
+ }
+ accessToken = null;
}
- accessToken = null;
- }
- // login to Salesforce and get session id
- final Request loginPost = getLoginRequest(null);
- try {
+ // login to Salesforce and get session id
+ final Request loginPost = getLoginRequest(null);
+ try {
- final ContentResponse loginResponse = loginPost.send();
- parseLoginResponse(loginResponse,
loginResponse.getContentAsString());
+ final ContentResponse loginResponse = loginPost.send();
+ parseLoginResponse(loginResponse,
loginResponse.getContentAsString());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SalesforceException("Login error: interrupted", e);
- } catch (TimeoutException e) {
- throw new SalesforceException("Login request timeout: " +
e.getMessage(), e);
- } catch (ExecutionException e) {
- throw new SalesforceException("Unexpected login error: " +
e.getCause().getMessage(), e.getCause());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SalesforceException("Login error: interrupted",
e);
+ } catch (TimeoutException e) {
+ throw new SalesforceException("Login request timeout: " +
e.getMessage(), e);
+ } catch (ExecutionException e) {
+ throw new SalesforceException("Unexpected login error: " +
e.getCause().getMessage(), e.getCause());
+ }
}
+ return accessToken;
+ } finally {
+ lock.unlock();
}
- return accessToken;
}
/**
@@ -301,11 +305,11 @@ public class SalesforceSession extends ServiceSupport {
* Parses login response, allows SalesforceSecurityHandler to parse a
login request for a failed authentication
* conversation.
*/
- public synchronized void parseLoginResponse(ContentResponse loginResponse,
String responseContent)
+ public void parseLoginResponse(ContentResponse loginResponse, String
responseContent)
throws SalesforceException {
- final int responseStatus = loginResponse.getStatus();
-
+ lock.lock();
try {
+ final int responseStatus = loginResponse.getStatus();
switch (responseStatus) {
case HttpStatus.OK_200:
// parse the response to get token
@@ -352,47 +356,55 @@ public class SalesforceSession extends ServiceSupport {
} catch (IOException e) {
String msg = "Login error: response parse exception " +
e.getMessage();
throw new SalesforceException(msg, e);
+ } finally {
+ lock.unlock();
}
}
- public synchronized void logout() throws SalesforceException {
- if (accessToken == null) {
- return;
- }
-
+ public void logout() throws SalesforceException {
+ lock.lock();
try {
- String logoutUrl = (instanceUrl == null ? config.getLoginUrl() :
instanceUrl) + OAUTH2_REVOKE_PATH + accessToken;
- final Request logoutGet =
httpClient.newRequest(logoutUrl).timeout(timeout, TimeUnit.MILLISECONDS);
- final ContentResponse logoutResponse = logoutGet.send();
+ if (accessToken == null) {
+ return;
+ }
- final int statusCode = logoutResponse.getStatus();
+ try {
+ String logoutUrl
+ = (instanceUrl == null ? config.getLoginUrl() :
instanceUrl) + OAUTH2_REVOKE_PATH + accessToken;
+ final Request logoutGet =
httpClient.newRequest(logoutUrl).timeout(timeout, TimeUnit.MILLISECONDS);
+ final ContentResponse logoutResponse = logoutGet.send();
- if (statusCode == HttpStatus.OK_200) {
- LOG.debug("Logout successful");
- } else {
- LOG.debug("Failed to revoke OAuth token. This is expected if
the token is invalid or already expired");
- }
+ final int statusCode = logoutResponse.getStatus();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SalesforceException("Interrupted while logging out", e);
- } catch (ExecutionException e) {
- final Throwable ex = e.getCause();
- throw new SalesforceException("Unexpected logout exception: " +
ex.getMessage(), ex);
- } catch (TimeoutException e) {
- throw new SalesforceException("Logout request TIMEOUT!", e);
- } finally {
- // reset session
- accessToken = null;
- instanceUrl = null;
- // notify all session listeners about logout
- for (SalesforceSessionListener listener : listeners) {
- try {
- listener.onLogout();
- } catch (Exception t) {
- LOG.warn("Unexpected error from listener {}: {}",
listener, t.getMessage());
+ if (statusCode == HttpStatus.OK_200) {
+ LOG.debug("Logout successful");
+ } else {
+ LOG.debug("Failed to revoke OAuth token. This is expected
if the token is invalid or already expired");
+ }
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SalesforceException("Interrupted while logging out",
e);
+ } catch (ExecutionException e) {
+ final Throwable ex = e.getCause();
+ throw new SalesforceException("Unexpected logout exception: "
+ ex.getMessage(), ex);
+ } catch (TimeoutException e) {
+ throw new SalesforceException("Logout request TIMEOUT!", e);
+ } finally {
+ // reset session
+ accessToken = null;
+ instanceUrl = null;
+ // notify all session listeners about logout
+ for (SalesforceSessionListener listener : listeners) {
+ try {
+ listener.onLogout();
+ } catch (Exception t) {
+ LOG.warn("Unexpected error from listener {}: {}",
listener, t.getMessage());
+ }
}
}
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 0ffda9096fe..894fcb16820 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -413,25 +413,30 @@ public class SubscriptionHelper extends ServiceSupport {
return client;
}
- public synchronized void subscribe(StreamingApiConsumer consumer) {
- // create subscription for consumer
- final String channelName = getChannelName(consumer.getTopicName());
- channelToConsumers.computeIfAbsent(channelName, key ->
ConcurrentHashMap.newKeySet()).add(consumer);
- channelsToSubscribe.add(channelName);
-
- setReplayIdIfAbsent(consumer.getEndpoint());
-
- // channel message listener
- LOG.info("Subscribing to channel {}...", channelName);
- var messageListener = consumerToListener.computeIfAbsent(consumer, key
-> (channel, message) -> {
- LOG.debug("Received Message: {}", message);
- // convert CometD message to Camel Message
- consumer.processMessage(channel, message);
- });
-
- // subscribe asynchronously
- final ClientSessionChannel clientChannel =
client.getChannel(channelName);
- clientChannel.subscribe(messageListener);
+ public void subscribe(StreamingApiConsumer consumer) {
+ lock.lock();
+ try {
+ // create subscription for consumer
+ final String channelName = getChannelName(consumer.getTopicName());
+ channelToConsumers.computeIfAbsent(channelName, key ->
ConcurrentHashMap.newKeySet()).add(consumer);
+ channelsToSubscribe.add(channelName);
+
+ setReplayIdIfAbsent(consumer.getEndpoint());
+
+ // channel message listener
+ LOG.info("Subscribing to channel {}...", channelName);
+ var messageListener = consumerToListener.computeIfAbsent(consumer,
key -> (channel, message) -> {
+ LOG.debug("Received Message: {}", message);
+ // convert CometD message to Camel Message
+ consumer.processMessage(channel, message);
+ });
+
+ // subscribe asynchronously
+ final ClientSessionChannel clientChannel =
client.getChannel(channelName);
+ clientChannel.subscribe(messageListener);
+ } finally {
+ lock.unlock();
+ }
}
private static boolean isTemporaryError(Message message) {
@@ -506,26 +511,31 @@ public class SubscriptionHelper extends ServiceSupport {
return channelName.toString();
}
- public synchronized void unsubscribe(StreamingApiConsumer consumer) {
- // channel name
- final String channelName = getChannelName(consumer.getTopicName());
-
- // unsubscribe from channel
- var consumers = channelToConsumers.get(channelName);
- if (consumers != null) {
- consumers.remove(consumer);
- if (consumers.isEmpty()) {
- channelToConsumers.remove(channelName);
+ public void unsubscribe(StreamingApiConsumer consumer) {
+ lock.lock();
+ try {
+ // channel name
+ final String channelName = getChannelName(consumer.getTopicName());
+
+ // unsubscribe from channel
+ var consumers = channelToConsumers.get(channelName);
+ if (consumers != null) {
+ consumers.remove(consumer);
+ if (consumers.isEmpty()) {
+ channelToConsumers.remove(channelName);
+ }
}
- }
- final ClientSessionChannel.MessageListener listener =
consumerToListener.remove(consumer);
- if (listener != null) {
- LOG.debug("Unsubscribing from channel {}...", channelName);
- final ClientSessionChannel clientChannel =
client.getChannel(channelName);
- // if there are other listeners on this channel, an unsubscribe
message will not be sent,
- // so we're not going to listen for and expect an unsub response.
Just unsub and move on.
- clientChannel.unsubscribe(listener);
- clientChannel.release();
+ final ClientSessionChannel.MessageListener listener =
consumerToListener.remove(consumer);
+ if (listener != null) {
+ LOG.debug("Unsubscribing from channel {}...", channelName);
+ final ClientSessionChannel clientChannel =
client.getChannel(channelName);
+ // if there are other listeners on this channel, an
unsubscribe message will not be sent,
+ // so we're not going to listen for and expect an unsub
response. Just unsub and move on.
+ clientChannel.unsubscribe(listener);
+ clientChannel.release();
+ }
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerComponent.java
b/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerComponent.java
index 4c23a7e6bb4..52cdbe76311 100644
---
a/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerComponent.java
+++
b/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerComponent.java
@@ -17,8 +17,8 @@
package org.apache.camel.component.scheduler;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
@@ -29,8 +29,7 @@ import org.apache.camel.support.HealthCheckComponent;
@org.apache.camel.spi.annotations.Component("scheduler")
public class SchedulerComponent extends HealthCheckComponent {
- private final Map<String, ScheduledExecutorService> executors = new
HashMap<>();
- private final Map<String, AtomicInteger> refCounts = new HashMap<>();
+ private final Map<String, ScheduledExecutorServiceHolder> executors = new
ConcurrentHashMap<>();
@Metadata
private boolean includeMetadata;
@@ -75,53 +74,46 @@ public class SchedulerComponent extends
HealthCheckComponent {
protected ScheduledExecutorService addConsumer(SchedulerConsumer consumer)
{
String name = consumer.getEndpoint().getName();
- int poolSize = consumer.getEndpoint().getPoolSize();
-
- ScheduledExecutorService answer;
- synchronized (executors) {
- answer = executors.get(name);
- if (answer == null) {
- answer =
getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this,
"scheduler://" + name,
- poolSize);
- executors.put(name, answer);
- // store new reference counter
- refCounts.put(name, new AtomicInteger(1));
- } else {
- // increase reference counter
- AtomicInteger counter = refCounts.get(name);
- if (counter != null) {
- counter.incrementAndGet();
- }
+ return executors.compute(name, (k, v) -> {
+ if (v == null) {
+ int poolSize = consumer.getEndpoint().getPoolSize();
+ return new ScheduledExecutorServiceHolder(
+
getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this,
"scheduler://" + name,
+ poolSize));
}
- }
- return answer;
+ v.refCount.incrementAndGet();
+ return v;
+ }).executorService;
}
protected void removeConsumer(SchedulerConsumer consumer) {
String name = consumer.getEndpoint().getName();
- synchronized (executors) {
- // decrease reference counter
- AtomicInteger counter = refCounts.get(name);
- if (counter != null && counter.decrementAndGet() <= 0) {
- refCounts.remove(name);
- // remove scheduler as its no longer in use
- ScheduledExecutorService scheduler = executors.remove(name);
- if (scheduler != null) {
-
getCamelContext().getExecutorServiceManager().shutdown(scheduler);
- }
+ executors.computeIfPresent(name, (k, v) -> {
+ if (v.refCount.decrementAndGet() == 0) {
+
getCamelContext().getExecutorServiceManager().shutdown(v.executorService);
+ return null;
}
- }
+ return v;
+ });
}
@Override
protected void doStop() throws Exception {
- Collection<ScheduledExecutorService> collection = executors.values();
- for (ScheduledExecutorService scheduler : collection) {
- getCamelContext().getExecutorServiceManager().shutdown(scheduler);
+ Collection<ScheduledExecutorServiceHolder> collection =
executors.values();
+ for (ScheduledExecutorServiceHolder holder : collection) {
+
getCamelContext().getExecutorServiceManager().shutdown(holder.executorService);
}
executors.clear();
- refCounts.clear();
}
+ private static class ScheduledExecutorServiceHolder {
+ private final ScheduledExecutorService executorService;
+ private final AtomicInteger refCount;
+
+ ScheduledExecutorServiceHolder(ScheduledExecutorService
executorService) {
+ this.executorService = executorService;
+ this.refCount = new AtomicInteger(1);
+ }
+ }
}
diff --git
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/QueueReference.java
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/QueueReference.java
index 61dd5922259..4efa264ba34 100644
---
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/QueueReference.java
+++
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/QueueReference.java
@@ -19,6 +19,8 @@ package org.apache.camel.component.seda;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.Exchange;
@@ -34,7 +36,8 @@ public final class QueueReference {
private Integer size;
private Boolean multipleConsumers;
- private List<SedaEndpoint> endpoints = new LinkedList<>();
+ private final Lock lock = new ReentrantLock();
+ private final List<SedaEndpoint> endpoints = new LinkedList<>();
QueueReference(BlockingQueue<Exchange> queue, Integer size, Boolean
multipleConsumers) {
this.queue = queue;
@@ -42,27 +45,40 @@ public final class QueueReference {
this.multipleConsumers = multipleConsumers;
}
- synchronized void addReference(SedaEndpoint endpoint) {
- if (!endpoints.contains(endpoint)) {
- endpoints.add(endpoint);
- // update the multipleConsumers setting if need
- if (endpoint.isMultipleConsumers()) {
- multipleConsumers = true;
+ void addReference(SedaEndpoint endpoint) {
+ lock.lock();
+ try {
+ if (!endpoints.contains(endpoint)) {
+ endpoints.add(endpoint);
+ // update the multipleConsumers setting if need
+ if (endpoint.isMultipleConsumers()) {
+ multipleConsumers = true;
+ }
}
+ } finally {
+ lock.unlock();
}
}
- synchronized void removeReference(SedaEndpoint endpoint) {
- if (endpoints.contains(endpoint)) {
+ void removeReference(SedaEndpoint endpoint) {
+ lock.lock();
+ try {
endpoints.remove(endpoint);
+ } finally {
+ lock.unlock();
}
}
/**
* Gets the reference counter
*/
- public synchronized int getCount() {
- return endpoints.size();
+ public int getCount() {
+ lock.lock();
+ try {
+ return endpoints.size();
+ } finally {
+ lock.unlock();
+ }
}
/**
@@ -85,13 +101,18 @@ public final class QueueReference {
return queue;
}
- public synchronized boolean hasConsumers() {
- for (SedaEndpoint endpoint : endpoints) {
- if (!endpoint.getConsumers().isEmpty()) {
- return true;
+ public boolean hasConsumers() {
+ lock.lock();
+ try {
+ for (SedaEndpoint endpoint : endpoints) {
+ if (!endpoint.getConsumers().isEmpty()) {
+ return true;
+ }
}
- }
- return false;
+ return false;
+ } finally {
+ lock.unlock();
+ }
}
}
diff --git
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaComponent.java
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaComponent.java
index cdf46a989ef..05dfb0bb2f0 100644
---
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaComponent.java
+++
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaComponent.java
@@ -144,69 +144,78 @@ public class SedaComponent extends DefaultComponent {
this.defaultPollTimeout = defaultPollTimeout;
}
- public synchronized QueueReference getOrCreateQueue(
+ public QueueReference getOrCreateQueue(
SedaEndpoint endpoint, Integer size, Boolean multipleConsumers,
BlockingQueueFactory<Exchange> customQueueFactory) {
+ lock.lock();
+ try {
+ String key = getQueueKey(endpoint.getEndpointUri());
- String key = getQueueKey(endpoint.getEndpointUri());
-
- if (size == null) {
- // there may be a custom size during startup
- size = customSize.get(key);
- }
-
- QueueReference ref = getQueues().get(key);
- if (ref != null) {
- // if the given size is not provided, we just use the existing
queue as is
- if (size != null && !size.equals(ref.getSize())) {
- // there is already a queue, so make sure the size matches
- throw new IllegalArgumentException(
- "Cannot use existing queue " + key + " as the existing
queue size "
- + (ref.getSize() != null ?
ref.getSize() : SedaConstants.QUEUE_SIZE)
- + " does not match given
queue size " + size);
+ if (size == null) {
+ // there may be a custom size during startup
+ size = customSize.get(key);
}
- // add the reference before returning queue
- ref.addReference(endpoint);
- if (log.isDebugEnabled()) {
- log.debug("Reusing existing queue {} with size {} and
reference count {}", key, size, ref.getCount());
+ QueueReference ref = getQueues().get(key);
+ if (ref != null) {
+ // if the given size is not provided, we just use the existing
queue as is
+ if (size != null && !size.equals(ref.getSize())) {
+ // there is already a queue, so make sure the size matches
+ throw new IllegalArgumentException(
+ "Cannot use existing queue " + key + " as the
existing queue size "
+ + (ref.getSize() !=
null ? ref.getSize() : SedaConstants.QUEUE_SIZE)
+ + " does not match
given queue size " + size);
+ }
+ // add the reference before returning queue
+ ref.addReference(endpoint);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Reusing existing queue {} with size {} and
reference count {}", key, size, ref.getCount());
+ }
+ return ref;
}
- return ref;
- }
- // create queue
- BlockingQueue<Exchange> queue;
- BlockingQueueFactory<Exchange> queueFactory = customQueueFactory ==
null ? defaultQueueFactory : customQueueFactory;
- if (size != null && size > 0) {
- queue = queueFactory.create(size);
- } else {
- if (getQueueSize() > 0) {
- size = getQueueSize();
- queue = queueFactory.create(getQueueSize());
+ // create queue
+ BlockingQueue<Exchange> queue;
+ BlockingQueueFactory<Exchange> queueFactory = customQueueFactory
== null ? defaultQueueFactory : customQueueFactory;
+ if (size != null && size > 0) {
+ queue = queueFactory.create(size);
} else {
- queue = queueFactory.create();
+ if (getQueueSize() > 0) {
+ size = getQueueSize();
+ queue = queueFactory.create(getQueueSize());
+ } else {
+ queue = queueFactory.create();
+ }
}
- }
- log.debug("Created queue {} with size {}", key, size);
+ log.debug("Created queue {} with size {}", key, size);
- // create and add a new reference queue
- ref = new QueueReference(queue, size, multipleConsumers);
- ref.addReference(endpoint);
- getQueues().put(key, ref);
+ // create and add a new reference queue
+ ref = new QueueReference(queue, size, multipleConsumers);
+ ref.addReference(endpoint);
+ getQueues().put(key, ref);
- return ref;
+ return ref;
+ } finally {
+ lock.unlock();
+ }
}
- public synchronized QueueReference registerQueue(SedaEndpoint endpoint,
BlockingQueue<Exchange> queue) {
- String key = getQueueKey(endpoint.getEndpointUri());
+ public QueueReference registerQueue(SedaEndpoint endpoint,
BlockingQueue<Exchange> queue) {
+ lock.lock();
+ try {
+ String key = getQueueKey(endpoint.getEndpointUri());
- QueueReference ref = getQueues().get(key);
- if (ref == null) {
- ref = new QueueReference(queue, endpoint.getSize(),
endpoint.isMultipleConsumers());
- ref.addReference(endpoint);
- getQueues().put(key, ref);
- }
+ QueueReference ref = getQueues().get(key);
+ if (ref == null) {
+ ref = new QueueReference(queue, endpoint.getSize(),
endpoint.isMultipleConsumers());
+ ref.addReference(endpoint);
+ getQueues().put(key, ref);
+ }
- return ref;
+ return ref;
+ } finally {
+ lock.unlock();
+ }
}
public Map<String, QueueReference> getQueues() {
diff --git
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
index d03d320e6be..8c4710edc04 100644
---
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
+++
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
@@ -183,30 +183,35 @@ public class SedaEndpoint extends DefaultEndpoint
implements AsyncEndpoint, Brow
return answer;
}
- public synchronized BlockingQueue<Exchange> getQueue() {
- if (queue == null) {
- // prefer to lookup queue from component, so if this endpoint is
re-created or re-started
- // then the existing queue from the component can be used, so new
producers and consumers
- // can use the already existing queue referenced from the component
- if (getComponent() != null) {
- // use null to indicate default size (= use what the existing
queue has been configured with)
- Integer size = (getSize() == Integer.MAX_VALUE || getSize() ==
SedaConstants.QUEUE_SIZE) ? null : getSize();
- QueueReference ref = getComponent().getOrCreateQueue(this,
size, isMultipleConsumers(), queueFactory);
- queue = ref.getQueue();
- String key = getComponent().getQueueKey(getEndpointUri());
- LOG.debug("Endpoint {} is using shared queue: {} with size:
{}", this, key,
- ref.getSize() != null ? ref.getSize() :
Integer.MAX_VALUE);
- // and set the size we are using
- if (ref.getSize() != null) {
- setSize(ref.getSize());
+ public BlockingQueue<Exchange> getQueue() {
+ lock.lock();
+ try {
+ if (queue == null) {
+ // prefer to lookup queue from component, so if this endpoint
is re-created or re-started
+ // then the existing queue from the component can be used, so
new producers and consumers
+ // can use the already existing queue referenced from the
component
+ if (getComponent() != null) {
+ // use null to indicate default size (= use what the
existing queue has been configured with)
+ Integer size = (getSize() == Integer.MAX_VALUE ||
getSize() == SedaConstants.QUEUE_SIZE) ? null : getSize();
+ QueueReference ref = getComponent().getOrCreateQueue(this,
size, isMultipleConsumers(), queueFactory);
+ queue = ref.getQueue();
+ String key = getComponent().getQueueKey(getEndpointUri());
+ LOG.debug("Endpoint {} is using shared queue: {} with
size: {}", this, key,
+ ref.getSize() != null ? ref.getSize() :
Integer.MAX_VALUE);
+ // and set the size we are using
+ if (ref.getSize() != null) {
+ setSize(ref.getSize());
+ }
+ } else {
+ // fallback and create queue (as this endpoint has no
component)
+ queue = createQueue();
+ LOG.debug("Endpoint {} is using queue: {} with size: {}",
this, getEndpointUri(), getSize());
}
- } else {
- // fallback and create queue (as this endpoint has no
component)
- queue = createQueue();
- LOG.debug("Endpoint {} is using queue: {} with size: {}",
this, getEndpointUri(), getSize());
}
+ return queue;
+ } finally {
+ lock.unlock();
}
- return queue;
}
protected BlockingQueue<Exchange> createQueue() {
@@ -240,45 +245,55 @@ public class SedaEndpoint extends DefaultEndpoint
implements AsyncEndpoint, Brow
return null;
}
- protected synchronized AsyncProcessor getConsumerMulticastProcessor() {
- if (!multicastStarted && consumerMulticastProcessor != null) {
- // only start it on-demand to avoid starting it during stopping
- ServiceHelper.startService(consumerMulticastProcessor);
- multicastStarted = true;
+ protected AsyncProcessor getConsumerMulticastProcessor() {
+ lock.lock();
+ try {
+ if (!multicastStarted && consumerMulticastProcessor != null) {
+ // only start it on-demand to avoid starting it during stopping
+ ServiceHelper.startService(consumerMulticastProcessor);
+ multicastStarted = true;
+ }
+ return consumerMulticastProcessor;
+ } finally {
+ lock.unlock();
}
- return consumerMulticastProcessor;
}
- protected synchronized void updateMulticastProcessor() throws Exception {
- // only needed if we support multiple consumers
- if (!isMultipleConsumersSupported()) {
- return;
- }
-
- // stop old before we create a new
- if (consumerMulticastProcessor != null) {
- ServiceHelper.stopService(consumerMulticastProcessor);
- consumerMulticastProcessor = null;
- }
-
- int size = getConsumers().size();
- if (size >= 1) {
- if (multicastExecutor == null) {
- // create multicast executor as we need it when we have more
than 1 processor
- multicastExecutor =
getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this,
- URISupport.sanitizeUri(getEndpointUri()) +
"(multicast)");
+ protected void updateMulticastProcessor() throws Exception {
+ lock.lock();
+ try {
+ // only needed if we support multiple consumers
+ if (!isMultipleConsumersSupported()) {
+ return;
}
- // create list of consumers to multicast to
- List<Processor> processors = new ArrayList<>(size);
- for (SedaConsumer consumer : getConsumers()) {
- processors.add(consumer.getProcessor());
+
+ // stop old before we create a new
+ if (consumerMulticastProcessor != null) {
+ ServiceHelper.stopService(consumerMulticastProcessor);
+ consumerMulticastProcessor = null;
}
- // create multicast processor
- multicastStarted = false;
- consumerMulticastProcessor = (AsyncProcessor)
PluginHelper.getProcessorFactory(getCamelContext())
- .createProcessor(getCamelContext(), "MulticastProcessor",
- new Object[] { processors, multicastExecutor,
false });
+ int size = getConsumers().size();
+ if (size >= 1) {
+ if (multicastExecutor == null) {
+ // create multicast executor as we need it when we have
more than 1 processor
+ multicastExecutor =
getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this,
+ URISupport.sanitizeUri(getEndpointUri()) +
"(multicast)");
+ }
+ // create list of consumers to multicast to
+ List<Processor> processors = new ArrayList<>(size);
+ for (SedaConsumer consumer : getConsumers()) {
+ processors.add(consumer.getProcessor());
+ }
+ // create multicast processor
+ multicastStarted = false;
+
+ consumerMulticastProcessor = (AsyncProcessor)
PluginHelper.getProcessorFactory(getCamelContext())
+ .createProcessor(getCamelContext(),
"MulticastProcessor",
+ new Object[] { processors, multicastExecutor,
false });
+ }
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-servicenow/camel-servicenow-component/src/main/java/org/apache/camel/component/servicenow/auth/OAuthToken.java
b/components/camel-servicenow/camel-servicenow-component/src/main/java/org/apache/camel/component/servicenow/auth/OAuthToken.java
index f317cd47015..74959b9815d 100644
---
a/components/camel-servicenow/camel-servicenow-component/src/main/java/org/apache/camel/component/servicenow/auth/OAuthToken.java
+++
b/components/camel-servicenow/camel-servicenow-component/src/main/java/org/apache/camel/component/servicenow/auth/OAuthToken.java
@@ -17,6 +17,8 @@
package org.apache.camel.component.servicenow.auth;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.component.servicenow.ServiceNowConfiguration;
import org.apache.cxf.jaxrs.client.WebClient;
@@ -30,6 +32,7 @@ import org.slf4j.LoggerFactory;
public class OAuthToken {
private static final Logger LOGGER =
LoggerFactory.getLogger(OAuthToken.class);
+ private final Lock lock = new ReentrantLock();
private final ServiceNowConfiguration configuration;
private ClientAccessToken token;
private String authString;
@@ -42,54 +45,59 @@ public class OAuthToken {
this.expireAt = 0;
}
- private synchronized void getOrRefreshAccessToken() {
- if (token == null) {
- LOGGER.debug("Generate OAuth token");
-
- token = OAuthClientUtils.getAccessToken(
- WebClient.create(configuration.getOauthTokenUrl()),
- new Consumer(
- configuration.getOauthClientId(),
- configuration.getOauthClientSecret()),
- new ResourceOwnerGrant(
- configuration.getUserName(),
- configuration.getPassword()),
- true);
-
- LOGGER.debug("OAuth token expires in {}s", token.getExpiresIn());
-
- // Set expiration time related info in milliseconds
- token.setIssuedAt(System.currentTimeMillis());
-
token.setExpiresIn(TimeUnit.MILLISECONDS.convert(token.getExpiresIn(),
TimeUnit.SECONDS));
-
- authString = token.toString();
-
- if (token.getExpiresIn() > 0) {
- expireAt = token.getIssuedAt() + token.getExpiresIn();
- }
- } else if (expireAt > 0 && System.currentTimeMillis() >= expireAt) {
- LOGGER.debug("OAuth token is expired, refresh it");
-
- token = OAuthClientUtils.refreshAccessToken(
- WebClient.create(configuration.getOauthTokenUrl()),
- new Consumer(
- configuration.getOauthClientId(),
- configuration.getOauthClientSecret()),
- token,
- null,
- false);
-
- LOGGER.debug("Refreshed OAuth token expires in {}s",
token.getExpiresIn());
-
- // Set expiration time related info in milliseconds
- token.setIssuedAt(System.currentTimeMillis());
-
token.setExpiresIn(TimeUnit.MILLISECONDS.convert(token.getExpiresIn(),
TimeUnit.SECONDS));
-
- authString = token.toString();
-
- if (token.getExpiresIn() > 0) {
- expireAt = token.getIssuedAt() + token.getExpiresIn();
+ private void getOrRefreshAccessToken() {
+ lock.lock();
+ try {
+ if (token == null) {
+ LOGGER.debug("Generate OAuth token");
+
+ token = OAuthClientUtils.getAccessToken(
+ WebClient.create(configuration.getOauthTokenUrl()),
+ new Consumer(
+ configuration.getOauthClientId(),
+ configuration.getOauthClientSecret()),
+ new ResourceOwnerGrant(
+ configuration.getUserName(),
+ configuration.getPassword()),
+ true);
+
+ LOGGER.debug("OAuth token expires in {}s",
token.getExpiresIn());
+
+ // Set expiration time related info in milliseconds
+ token.setIssuedAt(System.currentTimeMillis());
+
token.setExpiresIn(TimeUnit.MILLISECONDS.convert(token.getExpiresIn(),
TimeUnit.SECONDS));
+
+ authString = token.toString();
+
+ if (token.getExpiresIn() > 0) {
+ expireAt = token.getIssuedAt() + token.getExpiresIn();
+ }
+ } else if (expireAt > 0 && System.currentTimeMillis() >= expireAt)
{
+ LOGGER.debug("OAuth token is expired, refresh it");
+
+ token = OAuthClientUtils.refreshAccessToken(
+ WebClient.create(configuration.getOauthTokenUrl()),
+ new Consumer(
+ configuration.getOauthClientId(),
+ configuration.getOauthClientSecret()),
+ token,
+ null,
+ false);
+
+ LOGGER.debug("Refreshed OAuth token expires in {}s",
token.getExpiresIn());
+
+ // Set expiration time related info in milliseconds
+ token.setIssuedAt(System.currentTimeMillis());
+
token.setExpiresIn(TimeUnit.MILLISECONDS.convert(token.getExpiresIn(),
TimeUnit.SECONDS));
+
+ authString = token.toString();
+
+ if (token.getExpiresIn() > 0) {
+ expireAt = token.getIssuedAt() + token.getExpiresIn();
+ }
}
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
index 585482ce544..ea285912b88 100644
---
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
+++
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
@@ -134,14 +134,19 @@ public class SjmsComponent extends
HeaderFilterStrategyComponent {
super.doShutdown();
}
- protected synchronized ExecutorService getAsyncStartStopExecutorService() {
- if (asyncStartStopExecutorService == null) {
- // use a cached thread pool for async start tasks as they can run
for a while, and we need a dedicated thread
- // for each task, and the thread pool will shrink when no more
tasks running
- asyncStartStopExecutorService
- =
getCamelContext().getExecutorServiceManager().newCachedThreadPool(this,
"AsyncStartStopListener");
+ protected ExecutorService getAsyncStartStopExecutorService() {
+ lock.lock();
+ try {
+ if (asyncStartStopExecutorService == null) {
+ // use a cached thread pool for async start tasks as they can
run for a while, and we need a dedicated thread
+ // for each task, and the thread pool will shrink when no more
tasks running
+ asyncStartStopExecutorService
+ =
getCamelContext().getExecutorServiceManager().newCachedThreadPool(this,
"AsyncStartStopListener");
+ }
+ return asyncStartStopExecutorService;
+ } finally {
+ lock.unlock();
}
- return asyncStartStopExecutorService;
}
public void setConnectionFactory(ConnectionFactory connectionFactory) {
diff --git
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
index 9f71a770abd..6ff8154ea8a 100644
---
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
+++
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
@@ -95,7 +95,8 @@ public class SjmsProducer extends DefaultAsyncProducer {
protected void initReplyManager() {
if (!started.get()) {
- synchronized (this) {
+ lock.lock();
+ try {
if (started.get()) {
return;
}
@@ -136,6 +137,8 @@ public class SjmsProducer extends DefaultAsyncProducer {
Thread.currentThread().setContextClassLoader(oldClassLoader);
}
started.set(true);
+ } finally {
+ lock.unlock();
}
}
}
diff --git
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
index 3aa5eb35349..63fe3dd1c33 100644
---
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
+++
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
@@ -16,6 +16,9 @@
*/
package org.apache.camel.component.sjms.consumer;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
@@ -66,6 +69,7 @@ public class EndpointMessageListener implements
SessionMessageListener {
private boolean eagerLoadingOfProperties;
private String eagerPoisonBody;
private volatile SjmsTemplate template;
+ private final Lock lock = new ReentrantLock();
public EndpointMessageListener(SjmsConsumer consumer, SjmsEndpoint
endpoint, Processor processor) {
this.consumer = consumer;
@@ -73,11 +77,16 @@ public class EndpointMessageListener implements
SessionMessageListener {
this.processor = AsyncProcessorConverterHelper.convert(processor);
}
- public synchronized SjmsTemplate getTemplate() {
- if (template == null) {
- template = endpoint.createInOnlyTemplate();
+ public SjmsTemplate getTemplate() {
+ lock.lock();
+ try {
+ if (template == null) {
+ template = endpoint.createInOnlyTemplate();
+ }
+ return template;
+ } finally {
+ lock.unlock();
}
- return template;
}
public void setTemplate(SjmsTemplate template) {
diff --git
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java
index 0a931703378..081e98e65a0 100644
---
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java
+++
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java
@@ -19,6 +19,8 @@ package org.apache.camel.component.sjms.consumer;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
@@ -54,10 +56,10 @@ public class SimpleMessageListenerContainer extends
ServiceSupport
private String destinationName;
private DestinationCreationStrategy destinationCreationStrategy;
- private final Object connectionLock = new Object();
+ private final Lock connectionLock = new ReentrantLock();
private Connection connection;
private volatile boolean connectionStarted;
- private final Object consumerLock = new Object();
+ private final Lock consumerLock = new ReentrantLock();
private Set<MessageConsumer> consumers;
private Set<Session> sessions;
private BackOffTimer.Task recoverTask;
@@ -181,9 +183,12 @@ public class SimpleMessageListenerContainer extends
ServiceSupport
}
}
- synchronized (this.connectionLock) {
+ connectionLock.lock();
+ try {
this.sessions = null;
this.consumers = null;
+ } finally {
+ connectionLock.unlock();
}
scheduleConnectionRecovery();
}
@@ -240,7 +245,8 @@ public class SimpleMessageListenerContainer extends
ServiceSupport
}
protected void initConsumers() throws Exception {
- synchronized (this.consumerLock) {
+ consumerLock.lock();
+ try {
if (consumers == null) {
LOG.debug("Initializing {} concurrent consumers as JMS
listener on destination: {}", concurrentConsumers,
destinationName);
@@ -254,6 +260,8 @@ public class SimpleMessageListenerContainer extends
ServiceSupport
consumers.add(consumer);
}
}
+ } finally {
+ consumerLock.unlock();
}
}
@@ -266,7 +274,8 @@ public class SimpleMessageListenerContainer extends
ServiceSupport
}
protected void stopConsumers() {
- synchronized (this.consumerLock) {
+ consumerLock.lock();
+ try {
if (consumers != null) {
LOG.debug("Stopping JMS MessageConsumers");
for (MessageConsumer consumer : this.consumers) {
@@ -279,11 +288,14 @@ public class SimpleMessageListenerContainer extends
ServiceSupport
}
}
}
+ } finally {
+ consumerLock.unlock();
}
}
protected void createConnection() throws Exception {
- synchronized (this.connectionLock) {
+ connectionLock.lock();
+ try {
if (this.connection == null) {
Connection con = null;
try {
@@ -300,22 +312,28 @@ public class SimpleMessageListenerContainer extends
ServiceSupport
this.connection = con;
LOG.debug("Created JMS Connection");
}
+ } finally {
+ connectionLock.unlock();
}
}
protected final void refreshConnection() throws Exception {
- synchronized (this.connectionLock) {
+ connectionLock.lock();
+ try {
closeConnection(connection);
this.connection = null;
createConnection();
if (this.connectionStarted) {
startConnection();
}
+ } finally {
+ connectionLock.unlock();
}
}
protected void startConnection() throws Exception {
- synchronized (this.connectionLock) {
+ connectionLock.lock();
+ try {
this.connectionStarted = true;
if (this.connection != null) {
try {
@@ -324,11 +342,14 @@ public class SimpleMessageListenerContainer extends
ServiceSupport
// ignore as it may already be started
}
}
+ } finally {
+ connectionLock.unlock();
}
}
protected void stopConnection() {
- synchronized (this.connectionLock) {
+ connectionLock.lock();
+ try {
this.connectionStarted = false;
if (this.connection != null) {
try {
@@ -337,6 +358,8 @@ public class SimpleMessageListenerContainer extends
ServiceSupport
LOG.debug("Error stopping connection. This exception is
ignored.", e);
}
}
+ } finally {
+ connectionLock.unlock();
}
}
diff --git
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/MessageSelectorCreator.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/MessageSelectorCreator.java
index f7aa5899f38..59f8443fd93 100644
---
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/MessageSelectorCreator.java
+++
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/MessageSelectorCreator.java
@@ -17,6 +17,8 @@
package org.apache.camel.component.sjms.reply;
import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.TimeoutMap;
import org.slf4j.Logger;
@@ -32,6 +34,7 @@ public class MessageSelectorCreator {
protected static final Logger LOG =
LoggerFactory.getLogger(MessageSelectorCreator.class);
protected final TimeoutMap<String, ?> timeoutMap;
protected final ConcurrentSkipListSet<String> correlationIds;
+ private final Lock lock = new ReentrantLock();
protected volatile boolean dirty = true;
protected StringBuilder expression;
@@ -44,34 +47,39 @@ public class MessageSelectorCreator {
this.correlationIds = new ConcurrentSkipListSet<>();
}
- public synchronized String get() {
- if (!dirty) {
- return expression.toString();
- }
+ public String get() {
+ lock.lock();
+ try {
+ if (!dirty) {
+ return expression.toString();
+ }
- expression = new StringBuilder(256);
+ expression = new StringBuilder(256);
- expression.append("JMSCorrelationID='");
- if (correlationIds.isEmpty()) {
- // no id's so use a dummy to select nothing
- expression.append("CamelDummyJmsMessageSelector'");
- } else {
- boolean first = true;
- for (String value : correlationIds) {
- if (!first) {
- expression.append(" OR JMSCorrelationID='");
- }
- expression.append(value).append("'");
- if (first) {
- first = false;
+ expression.append("JMSCorrelationID='");
+ if (correlationIds.isEmpty()) {
+ // no id's so use a dummy to select nothing
+ expression.append("CamelDummyJmsMessageSelector'");
+ } else {
+ boolean first = true;
+ for (String value : correlationIds) {
+ if (!first) {
+ expression.append(" OR JMSCorrelationID='");
+ }
+ expression.append(value).append("'");
+ if (first) {
+ first = false;
+ }
}
}
- }
- String answer = expression.toString();
+ String answer = expression.toString();
- dirty = false;
- return answer;
+ dirty = false;
+ return answer;
+ } finally {
+ lock.unlock();
+ }
}
// Changes to live correlation-ids invalidate existing message selector
diff --git
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java
index 662f40fa6cc..c92ba21e516 100644
---
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java
+++
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java
@@ -73,12 +73,15 @@ public class QueueReplyManager extends ReplyManagerSupport {
@Override
public Destination createDestination(Session session, String
destinationName, boolean topic) throws JMSException {
- synchronized (QueueReplyManager.this) {
+ QueueReplyManager.this.lock.lock();
+ try {
// resolve the reply to destination
if (destination == null) {
destination = delegate.createDestination(session,
destinationName, topic);
setReplyTo(destination);
}
+ } finally {
+ QueueReplyManager.this.lock.unlock();
}
return destination;
}
diff --git
a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConnectionFactory.java
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConnectionFactory.java
index 4f13a2d5ff3..c9119d3b4b0 100644
---
a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConnectionFactory.java
+++
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConnectionFactory.java
@@ -21,6 +21,8 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import com.splunk.HttpService;
import com.splunk.SSLSecurityProtocol;
@@ -45,6 +47,7 @@ public class SplunkConnectionFactory {
private boolean useSunHttpsHandler;
private SSLSecurityProtocol sslProtocol;
private boolean validateCertificates;
+ private final Lock lock = new ReentrantLock();
public SplunkConnectionFactory(final String host, final int port, final
String username, final String password) {
this.host = host;
@@ -113,81 +116,86 @@ public class SplunkConnectionFactory {
this.token = token;
}
- public synchronized Service createService(CamelContext camelContext) {
- final ServiceArgs args = new ServiceArgs();
- if (host != null) {
- args.setHost(host);
- }
- if (port > 0) {
- args.setPort(port);
- }
- if (scheme != null) {
- args.setScheme(scheme);
- }
- if (app != null) {
- args.setApp(app);
- }
- if (owner != null) {
- args.setOwner(owner);
- }
- if (username != null) {
- args.setUsername(username);
- }
- if (password != null && token == null) {
- args.setPassword(password);
- }
- if (token != null) {
- args.setToken(String.format("Bearer %s", token));
- args.remove("username");
- args.remove("password");
- }
- // useful in cases where you want to bypass app. servers https handling
- // (wls i'm looking at you)
- if (isUseSunHttpsHandler()) {
- String sunHandlerClassName = "sun.net.www.protocol.https.Handler";
- Class<URLStreamHandler> clazz
- =
camelContext.getClassResolver().resolveClass(sunHandlerClassName,
URLStreamHandler.class);
- if (clazz != null) {
- URLStreamHandler handler =
camelContext.getInjector().newInstance(clazz);
- args.setHTTPSHandler(handler);
- LOG.debug("using the URLStreamHandler {} for {}", handler,
args);
- } else {
- LOG.warn("could not resolve and use the URLStreamHandler class
'{}'", sunHandlerClassName);
+ public Service createService(CamelContext camelContext) {
+ lock.lock();
+ try {
+ final ServiceArgs args = new ServiceArgs();
+ if (host != null) {
+ args.setHost(host);
}
- }
-
- ExecutorService executor
- =
camelContext.getExecutorServiceManager().newSingleThreadExecutor(this,
"DefaultSplunkConnectionFactory");
-
- Future<Service> future = executor.submit(new Callable<Service>() {
- public Service call() throws Exception {
- if (Service.DEFAULT_SCHEME.equals(getScheme())) {
- LOG.debug("Https in use. Setting SSL protocol to {} and
sertificate validation to %s", getSslProtocol(),
- isValidateCertificates());
-
HttpService.setValidateCertificates(isValidateCertificates());
- HttpService.setSslSecurityProtocol(getSslProtocol());
+ if (port > 0) {
+ args.setPort(port);
+ }
+ if (scheme != null) {
+ args.setScheme(scheme);
+ }
+ if (app != null) {
+ args.setApp(app);
+ }
+ if (owner != null) {
+ args.setOwner(owner);
+ }
+ if (username != null) {
+ args.setUsername(username);
+ }
+ if (password != null && token == null) {
+ args.setPassword(password);
+ }
+ if (token != null) {
+ args.setToken(String.format("Bearer %s", token));
+ args.remove("username");
+ args.remove("password");
+ }
+ // useful in cases where you want to bypass app. servers https
handling
+ // (wls i'm looking at you)
+ if (isUseSunHttpsHandler()) {
+ String sunHandlerClassName =
"sun.net.www.protocol.https.Handler";
+ Class<URLStreamHandler> clazz
+ =
camelContext.getClassResolver().resolveClass(sunHandlerClassName,
URLStreamHandler.class);
+ if (clazz != null) {
+ URLStreamHandler handler =
camelContext.getInjector().newInstance(clazz);
+ args.setHTTPSHandler(handler);
+ LOG.debug("using the URLStreamHandler {} for {}", handler,
args);
+ } else {
+ LOG.warn("could not resolve and use the URLStreamHandler
class '{}'", sunHandlerClassName);
}
- return Service.connect(args);
}
- });
- try {
- Service service = null;
- if (connectionTimeout > 0) {
- service = future.get(connectionTimeout, TimeUnit.MILLISECONDS);
- } else {
- service = future.get();
+
+ ExecutorService executor
+ =
camelContext.getExecutorServiceManager().newSingleThreadExecutor(this,
"DefaultSplunkConnectionFactory");
+
+ Future<Service> future = executor.submit(new Callable<Service>() {
+ public Service call() throws Exception {
+ if (Service.DEFAULT_SCHEME.equals(getScheme())) {
+ LOG.debug("Https in use. Setting SSL protocol to {}
and sertificate validation to %s", getSslProtocol(),
+ isValidateCertificates());
+
HttpService.setValidateCertificates(isValidateCertificates());
+ HttpService.setSslSecurityProtocol(getSslProtocol());
+ }
+ return Service.connect(args);
+ }
+ });
+ try {
+ Service service = null;
+ if (connectionTimeout > 0) {
+ service = future.get(connectionTimeout,
TimeUnit.MILLISECONDS);
+ } else {
+ service = future.get();
+ }
+ LOG.info("Successfully connected to Splunk");
+ return service;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ String.format("could not connect to Splunk Server @
%s:%d - %s", host, port, e.getMessage()), e);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("could not connect to Splunk Server @
%s:%d - %s", host, port, e.getMessage()), e);
+ } finally {
+ camelContext.getExecutorServiceManager().shutdownNow(executor);
}
- LOG.info("Successfully connected to Splunk");
- return service;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(
- String.format("could not connect to Splunk Server @ %s:%d
- %s", host, port, e.getMessage()), e);
- } catch (Exception e) {
- throw new RuntimeException(
- String.format("could not connect to Splunk Server @ %s:%d
- %s", host, port, e.getMessage()), e);
} finally {
- camelContext.getExecutorServiceManager().shutdownNow(executor);
+ lock.unlock();
}
}
}
diff --git
a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkEndpoint.java
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkEndpoint.java
index ec90916e8b1..1fe381a9138 100644
---
a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkEndpoint.java
+++
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkEndpoint.java
@@ -130,14 +130,19 @@ public class SplunkEndpoint extends ScheduledPollEndpoint
implements EndpointSer
return configuration;
}
- public synchronized boolean reset(Exception e) {
- boolean answer = false;
- if (e instanceof RuntimeException && e.getCause() instanceof
ConnectException
- || e instanceof SocketException || e instanceof SSLException) {
- LOG.warn("Got exception from Splunk. Service will be reset.");
- this.service = null;
- answer = true;
+ public boolean reset(Exception e) {
+ lock.lock();
+ try {
+ boolean answer = false;
+ if (e instanceof RuntimeException && e.getCause() instanceof
ConnectException
+ || e instanceof SocketException || e instanceof
SSLException) {
+ LOG.warn("Got exception from Splunk. Service will be reset.");
+ this.service = null;
+ answer = true;
+ }
+ return answer;
+ } finally {
+ lock.unlock();
}
- return answer;
}
}
diff --git
a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java
index f131fa14d6d..36b8d28aaf2 100644
---
a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java
+++
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java
@@ -22,6 +22,8 @@ import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import com.splunk.Args;
import com.splunk.Service;
@@ -37,6 +39,7 @@ public abstract class SplunkDataWriter implements DataWriter {
protected Args args;
private boolean connected;
private Socket socket;
+ protected final Lock lock = new ReentrantLock();
public SplunkDataWriter(SplunkEndpoint endpoint, Args args) {
this.endpoint = endpoint;
@@ -55,27 +58,36 @@ public abstract class SplunkDataWriter implements
DataWriter {
doWrite(event + SplunkEvent.LINEBREAK);
}
- protected synchronized void doWrite(String event) throws IOException {
- LOG.debug("writing event to splunk:{}", event);
- OutputStream ostream = socket.getOutputStream();
- Writer writer = new OutputStreamWriter(ostream,
StandardCharsets.UTF_8);
- writer.write(event);
- writer.flush();
+ protected void doWrite(String event) throws IOException {
+ lock.lock();
+ try {
+ LOG.debug("writing event to splunk:{}", event);
+ OutputStream ostream = socket.getOutputStream();
+ Writer writer = new OutputStreamWriter(ostream,
StandardCharsets.UTF_8);
+ writer.write(event);
+ writer.flush();
+ } finally {
+ lock.unlock();
+ }
}
@Override
- public synchronized void start() {
+ public void start() {
+ lock.lock();
try {
socket = createSocket(endpoint.getService());
connected = true;
} catch (Exception e) {
connected = false;
throw new RuntimeException(e);
+ } finally {
+ lock.unlock();
}
}
@Override
- public synchronized void stop() {
+ public void stop() {
+ lock.lock();
try {
if (socket != null) {
socket.close();
@@ -83,6 +95,8 @@ public abstract class SplunkDataWriter implements DataWriter {
}
} catch (Exception e) {
throw new RuntimeException(e);
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java
index f9587b080c1..1d773ed8400 100644
---
a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java
+++
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java
@@ -33,13 +33,18 @@ public class SubmitDataWriter extends SplunkDataWriter {
}
@Override
- protected synchronized void doWrite(String event) throws IOException {
- Index index = getIndex();
- if (index != null) {
- index.submit(args, event);
- } else {
- Receiver receiver = endpoint.getService().getReceiver();
- receiver.submit(args, event);
+ protected void doWrite(String event) throws IOException {
+ lock.lock();
+ try {
+ Index index = getIndex();
+ if (index != null) {
+ index.submit(args, event);
+ } else {
+ Receiver receiver = endpoint.getService().getReceiver();
+ receiver.submit(args, event);
+ }
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java
index 2dda0bd9ec2..b16969f2b8e 100644
---
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java
+++
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java
@@ -17,6 +17,8 @@
package org.apache.camel.component.springrabbit;
import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import com.rabbitmq.client.Channel;
import org.apache.camel.AsyncCallback;
@@ -47,6 +49,7 @@ public class EndpointMessageListener implements
ChannelAwareMessageListener {
private RabbitTemplate template;
private boolean disableReplyTo;
private boolean async;
+ private final Lock lock = new ReentrantLock();
public EndpointMessageListener(SpringRabbitMQConsumer consumer,
SpringRabbitMQEndpoint endpoint, Processor processor) {
this.consumer = consumer;
@@ -76,11 +79,16 @@ public class EndpointMessageListener implements
ChannelAwareMessageListener {
this.disableReplyTo = disableReplyTo;
}
- public synchronized RabbitTemplate getTemplate() {
- if (template == null) {
- template = endpoint.createInOnlyTemplate();
+ public RabbitTemplate getTemplate() {
+ lock.lock();
+ try {
+ if (template == null) {
+ template = endpoint.createInOnlyTemplate();
+ }
+ return template;
+ } finally {
+ lock.unlock();
}
- return template;
}
public void setTemplate(RabbitTemplate template) {
diff --git
a/components/camel-spring-security/src/main/java/org/apache/camel/component/spring/security/SpringSecurityAuthorizationPolicy.java
b/components/camel-spring-security/src/main/java/org/apache/camel/component/spring/security/SpringSecurityAuthorizationPolicy.java
index 52a53292e00..59548292cec 100644
---
a/components/camel-spring-security/src/main/java/org/apache/camel/component/spring/security/SpringSecurityAuthorizationPolicy.java
+++
b/components/camel-spring-security/src/main/java/org/apache/camel/component/spring/security/SpringSecurityAuthorizationPolicy.java
@@ -16,6 +16,9 @@
*/
package org.apache.camel.component.spring.security;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import javax.security.auth.Subject;
import org.apache.camel.CamelAuthorizationException;
@@ -48,10 +51,11 @@ public class SpringSecurityAuthorizationPolicy extends
IdentifiedType
private static final Logger LOG =
LoggerFactory.getLogger(SpringSecurityAuthorizationPolicy.class);
private AuthorizationManager<Exchange> authorizationManager;
private AuthenticationManager authenticationManager;
- private AuthenticationAdapter authenticationAdapter;
+ private volatile AuthenticationAdapter authenticationAdapter;
private ApplicationEventPublisher eventPublisher;
private boolean alwaysReauthenticate;
private boolean useThreadSecurityContext = true;
+ private final Lock lock = new ReentrantLock();
@Override
public void beforeWrap(Route route, NamedNode definition) {
@@ -145,16 +149,20 @@ public class SpringSecurityAuthorizationPolicy extends
IdentifiedType
}
public AuthenticationAdapter getAuthenticationAdapter() {
- if (authenticationAdapter == null) {
- synchronized (this) {
- if (authenticationAdapter != null) {
- return authenticationAdapter;
- } else {
- authenticationAdapter = new DefaultAuthenticationAdapter();
+ AuthenticationAdapter adapter = authenticationAdapter;
+ if (adapter == null) {
+ lock.lock();
+ try {
+ adapter = authenticationAdapter;
+ if (adapter == null) {
+ adapter = new DefaultAuthenticationAdapter();
+ authenticationAdapter = adapter;
}
+ } finally {
+ lock.unlock();
}
}
- return authenticationAdapter;
+ return adapter;
}
public void setAuthenticationAdapter(AuthenticationAdapter adapter) {
diff --git
a/components/camel-spring-ws/src/main/java/org/apache/camel/component/spring/ws/SpringWebserviceProducer.java
b/components/camel-spring-ws/src/main/java/org/apache/camel/component/spring/ws/SpringWebserviceProducer.java
index 7ad8f5b78ec..0c210e330b4 100644
---
a/components/camel-spring-ws/src/main/java/org/apache/camel/component/spring/ws/SpringWebserviceProducer.java
+++
b/components/camel-spring-ws/src/main/java/org/apache/camel/component/spring/ws/SpringWebserviceProducer.java
@@ -21,6 +21,8 @@ import java.net.HttpURLConnection;
import java.net.URI;
import java.security.GeneralSecurityException;
import java.util.Iterator;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
@@ -213,6 +215,7 @@ public class SpringWebserviceProducer extends
DefaultProducer {
private final AbstractHttpWebServiceMessageSender delegate;
private final SpringWebserviceConfiguration configuration;
private final CamelContext camelContext;
+ private final Lock lock = new ReentrantLock();
private SSLContext sslContext;
@@ -235,14 +238,15 @@ public class SpringWebserviceProducer extends
DefaultProducer {
}
if (configuration.getSslContextParameters() != null &&
connection instanceof HttpsURLConnection) {
+ lock.lock();
try {
- synchronized (this) {
- if (sslContext == null) {
- sslContext =
configuration.getSslContextParameters().createSSLContext(camelContext);
- }
+ if (sslContext == null) {
+ sslContext =
configuration.getSslContextParameters().createSSLContext(camelContext);
}
} catch (GeneralSecurityException e) {
throw new RuntimeCamelException("Error creating
SSLContext based on SSLContextParameters.", e);
+ } finally {
+ lock.unlock();
}
((HttpsURLConnection)
connection).setSSLSocketFactory(sslContext.getSocketFactory());
diff --git
a/components/camel-spring/src/main/java/org/apache/camel/component/event/EventEndpoint.java
b/components/camel-spring/src/main/java/org/apache/camel/component/event/EventEndpoint.java
index 1e8ef97b1d6..667035bf135 100644
---
a/components/camel-spring/src/main/java/org/apache/camel/component/event/EventEndpoint.java
+++
b/components/camel-spring/src/main/java/org/apache/camel/component/event/EventEndpoint.java
@@ -121,14 +121,24 @@ public class EventEndpoint extends DefaultEndpoint
implements ApplicationContext
// Implementation methods
//
-------------------------------------------------------------------------
- public synchronized void consumerStarted(EventConsumer consumer) {
- getComponent().consumerStarted(this);
- getLoadBalancer().addProcessor(consumer.getAsyncProcessor());
+ public void consumerStarted(EventConsumer consumer) {
+ lock.lock();
+ try {
+ getComponent().consumerStarted(this);
+ getLoadBalancer().addProcessor(consumer.getAsyncProcessor());
+ } finally {
+ lock.unlock();
+ }
}
- public synchronized void consumerStopped(EventConsumer consumer) {
- getComponent().consumerStopped(this);
- getLoadBalancer().removeProcessor(consumer.getAsyncProcessor());
+ public void consumerStopped(EventConsumer consumer) {
+ lock.lock();
+ try {
+ getComponent().consumerStopped(this);
+ getLoadBalancer().removeProcessor(consumer.getAsyncProcessor());
+ } finally {
+ lock.unlock();
+ }
}
protected LoadBalancer createLoadBalancer() {
diff --git
a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerReifier.java
b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerReifier.java
index 6d5199bcb17..a45914ddc73 100644
---
a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerReifier.java
+++
b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerReifier.java
@@ -191,29 +191,34 @@ public class TransactionErrorHandlerReifier extends
ErrorHandlerReifier<SpringTr
return answer;
}
- protected synchronized ScheduledExecutorService getExecutorService(
+ protected ScheduledExecutorService getExecutorService(
ScheduledExecutorService executorService, String
executorServiceRef) {
- if (executorService == null || executorService.isShutdown()) {
- // camel context will shutdown the executor when it shutdown so no
- // need to shut it down when stopping
- if (executorServiceRef != null) {
- executorService = lookupByNameAndType(executorServiceRef,
ScheduledExecutorService.class);
- if (executorService == null) {
- ExecutorServiceManager manager =
camelContext.getExecutorServiceManager();
- ThreadPoolProfile profile =
manager.getThreadPoolProfile(executorServiceRef);
- executorService = manager.newScheduledThreadPool(this,
executorServiceRef, profile);
+ lock.lock();
+ try {
+ if (executorService == null || executorService.isShutdown()) {
+ // camel context will shutdown the executor when it shutdown
so no
+ // need to shut it down when stopping
+ if (executorServiceRef != null) {
+ executorService = lookupByNameAndType(executorServiceRef,
ScheduledExecutorService.class);
+ if (executorService == null) {
+ ExecutorServiceManager manager =
camelContext.getExecutorServiceManager();
+ ThreadPoolProfile profile =
manager.getThreadPoolProfile(executorServiceRef);
+ executorService = manager.newScheduledThreadPool(this,
executorServiceRef, profile);
+ }
+ if (executorService == null) {
+ throw new IllegalArgumentException("ExecutorService "
+ executorServiceRef + " not found in registry.");
+ }
+ } else {
+ // no explicit configured thread pool, so leave it up to
the
+ // error handler to decide if it need a default thread
pool from
+ // CamelContext#getErrorHandlerExecutorService
+ executorService = null;
}
- if (executorService == null) {
- throw new IllegalArgumentException("ExecutorService " +
executorServiceRef + " not found in registry.");
- }
- } else {
- // no explicit configured thread pool, so leave it up to the
- // error handler to decide if it need a default thread pool
from
- // CamelContext#getErrorHandlerExecutorService
- executorService = null;
}
+ return executorService;
+ } finally {
+ lock.unlock();
}
- return executorService;
}
}
diff --git
a/components/camel-stax/src/main/java/org/apache/camel/component/stax/StAXJAXBIteratorExpression.java
b/components/camel-stax/src/main/java/org/apache/camel/component/stax/StAXJAXBIteratorExpression.java
index 40f265349b4..f9b785d8c78 100644
---
a/components/camel-stax/src/main/java/org/apache/camel/component/stax/StAXJAXBIteratorExpression.java
+++
b/components/camel-stax/src/main/java/org/apache/camel/component/stax/StAXJAXBIteratorExpression.java
@@ -101,16 +101,17 @@ public class StAXJAXBIteratorExpression<T> extends
ExpressionAdapter {
}
private static JAXBContext jaxbContext(Class<?> handled) throws
JAXBException {
- if (JAX_CONTEXTS.containsKey(handled)) {
- return JAX_CONTEXTS.get(handled);
- }
-
- JAXBContext context;
- synchronized (JAX_CONTEXTS) {
- context = JAXBContext.newInstance(handled);
- JAX_CONTEXTS.put(handled, context);
+ try {
+ return JAX_CONTEXTS.computeIfAbsent(handled, k -> {
+ try {
+ return JAXBContext.newInstance(handled);
+ } catch (JAXBException e) {
+ throw new RuntimeCamelException(e);
+ }
+ });
+ } catch (RuntimeCamelException e) {
+ throw (JAXBException) e.getCause();
}
- return context;
}
@Override
diff --git
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
index 3ab6db20563..8739b8ec5a2 100644
---
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
+++
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
@@ -335,43 +335,53 @@ public class StreamConsumer extends DefaultConsumer
implements Runnable {
/**
* Strategy method for processing the line
*/
- protected synchronized long processLine(String line, boolean last, long
index) throws Exception {
- if (endpoint.getGroupLines() > 0) {
- // remember line
- if (line != null) {
- lines.add(line);
- }
+ protected long processLine(String line, boolean last, long index) throws
Exception {
+ lock.lock();
+ try {
+ if (endpoint.getGroupLines() > 0) {
+ // remember line
+ if (line != null) {
+ lines.add(line);
+ }
- // should we flush lines?
- if (!lines.isEmpty() && (lines.size() >= endpoint.getGroupLines()
|| last)) {
- // spit out lines as we hit the size, or it was the last
- List<String> copy = new ArrayList<>(lines);
- Object body = endpoint.getGroupStrategy().groupLines(copy);
- // remember to inc index when we create an exchange
- Exchange exchange = createExchange(body, index++, last);
+ // should we flush lines?
+ if (!lines.isEmpty() && (lines.size() >=
endpoint.getGroupLines() || last)) {
+ // spit out lines as we hit the size, or it was the last
+ List<String> copy = new ArrayList<>(lines);
+ Object body = endpoint.getGroupStrategy().groupLines(copy);
+ // remember to inc index when we create an exchange
+ Exchange exchange = createExchange(body, index++, last);
- // clear lines
- lines.clear();
+ // clear lines
+ lines.clear();
+ getProcessor().process(exchange);
+ }
+ } else if (line != null) {
+ // single line
+ // remember to inc index when we create an exchange
+ Exchange exchange = createExchange(line, index++, last);
getProcessor().process(exchange);
}
- } else if (line != null) {
- // single line
- // remember to inc index when we create an exchange
- Exchange exchange = createExchange(line, index++, last);
- getProcessor().process(exchange);
- }
- return index;
+ return index;
+ } finally {
+ lock.unlock();
+ }
}
/**
* Strategy method for processing the data
*/
- protected synchronized long processRaw(byte[] body, long index) throws
Exception {
- Exchange exchange = createExchange(body, index++, true);
- getProcessor().process(exchange);
- return index;
+ protected long processRaw(byte[] body, long index) throws Exception {
+ lock.lock();
+ try {
+ Exchange exchange = createExchange(body, index++, true);
+ getProcessor().process(exchange);
+ return index;
+ } finally {
+ lock.unlock();
+ }
}
/**
diff --git
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
index 77b0e7d5298..9cc2328511b 100644
---
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
+++
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
@@ -69,14 +69,16 @@ public class StreamProducer extends DefaultAsyncProducer {
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
delay(endpoint.getDelay());
-
- synchronized (this) {
+ lock.lock();
+ try {
try {
openStream(exchange);
writeToStream(outputStream, exchange);
} finally {
closeStream(exchange, false);
}
+ } finally {
+ lock.unlock();
}
} catch (InterruptedException e) {
exchange.setException(e);