This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch jms-inout
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/jms-inout by this push:
new 5edc42ce7ef camel-jms - Using in/out should use single atomic
operation with the timeout map for the correlation id. Could help with
org.apache.camel.component.jms.JmsProducerConcurrentWithReplyTest.testConcurrentProducers
5edc42ce7ef is described below
commit 5edc42ce7ef3e4e1b5a291be43922ad38ffd26fa
Author: Claus Ibsen <[email protected]>
AuthorDate: Sat Mar 23 18:00:21 2024 +0100
camel-jms - Using in/out should use single atomic operation with the
timeout map for the correlation id. Could help with
org.apache.camel.component.jms.JmsProducerConcurrentWithReplyTest.testConcurrentProducers
---
.../apache/camel/component/jms/reply/ReplyManagerSupport.java | 10 +++++-----
.../apache/camel/component/sjms/reply/QueueReplyManager.java | 5 +----
.../camel/component/sjms/reply/TemporaryQueueReplyManager.java | 3 +--
.../main/java/org/apache/camel/support/DefaultTimeoutMap.java | 5 +++++
4 files changed, 12 insertions(+), 11 deletions(-)
diff --git
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
index b8e12e72078..31234500eee 100644
---
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
+++
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
@@ -231,13 +231,13 @@ public abstract class ReplyManagerSupport extends
ServiceSupport implements Repl
/**
* <b>IMPORTANT:</b> This logic is only being used due to high performance
in-memory only testing using InOut over
- * JMS. Its unlikely to happen in a real life situation with communication
to a remote broker, which always will be
- * slower to send back reply, before Camel had a chance to update it's
internal correlation map.
+ * JMS. It is unlikely to happen in a real life situation with
communication to a remote broker, which always will
+ * be slower to send back reply, before Camel had a chance to update the
internal correlation map.
*/
protected ReplyHandler waitForProvisionCorrelationToBeUpdated(String
correlationID, Message message) {
// race condition, when using messageID as correlationID then we store
a provisional correlation id
// at first, which gets updated with the JMSMessageID after the
message has been sent. And in the unlikely
- // event that the reply comes back really really fast, and the
correlation map hasn't yet been updated
+ // event that the reply comes back really fast, and the correlation
map hasn't yet been updated
// from the provisional id to the JMSMessageID. If so we have to wait
a bit and lookup again.
if (log.isWarnEnabled()) {
log.warn("Early reply received with correlationID [{}] -> {}",
correlationID, message);
@@ -255,8 +255,8 @@ public abstract class ReplyManagerSupport extends
ServiceSupport implements Repl
}
private ReplyHandler getReplyHandler(String correlationID) {
- log.trace("Early reply not found handler. Waiting a bit longer.");
- return correlation.get(correlationID);
+ log.trace("Early reply not found. Waiting a bit longer.");
+ return correlation.remove(correlationID); // get and remove
}
@Override
diff --git
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java
index fb67ba02739..662f40fa6cc 100644
---
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java
+++
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java
@@ -45,16 +45,13 @@ public class QueueReplyManager extends ReplyManagerSupport {
// should not happen that we can't find the handler
return;
}
-
correlation.put(newCorrelationId, handler, requestTimeout);
}
@Override
protected void handleReplyMessage(String correlationID, Message message,
Session session) {
- ReplyHandler handler = correlation.get(correlationID);
-
+ ReplyHandler handler = correlation.remove(correlationID);
if (handler != null) {
- correlation.remove(correlationID);
handler.onReply(correlationID, message, session);
} else {
// we could not correlate the received reply message to a matching
request and therefore
diff --git
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/TemporaryQueueReplyManager.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/TemporaryQueueReplyManager.java
index 6c3ce420c1e..98553e9a23a 100644
---
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/TemporaryQueueReplyManager.java
+++
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/TemporaryQueueReplyManager.java
@@ -63,9 +63,8 @@ public class TemporaryQueueReplyManager extends
ReplyManagerSupport {
@Override
protected void handleReplyMessage(String correlationID, Message message,
Session session) {
- ReplyHandler handler = correlation.get(correlationID);
+ ReplyHandler handler = correlation.remove(correlationID);
if (handler != null) {
- correlation.remove(correlationID);
handler.onReply(correlationID, message, session);
} else {
// we could not correlate the received reply message to a matching
request and therefore
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultTimeoutMap.java
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultTimeoutMap.java
index 5be87e40ef0..b6cda4f3ecc 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultTimeoutMap.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultTimeoutMap.java
@@ -134,6 +134,11 @@ public class DefaultTimeoutMap<K, V> extends
ServiceSupport implements TimeoutMa
@Override
public V remove(K key) {
+ // if no contains, the lock is not necessary
+ if (!map.containsKey(key)) {
+ return null;
+ }
+
V value = null;
lock.lock();
try {