This is an automated email from the ASF dual-hosted git repository.

dmvolod pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new f3a1cb5  CAMEL-12869: ReplyTo destination must match endpoint type 
(topic or queue) that the message is sent on
f3a1cb5 is described below

commit f3a1cb5492d897b24b45c8e9bd583f5d4259313b
Author: Dmitry Volodin <[email protected]>
AuthorDate: Thu Nov 15 18:51:21 2018 +0300

    CAMEL-12869: ReplyTo destination must match endpoint type (topic or
    queue) that the message is sent on
---
 components/camel-sjms/src/main/docs/sjms-component.adoc     |  2 +-
 .../java/org/apache/camel/component/sjms/SjmsEndpoint.java  |  2 ++
 .../camel/component/sjms/jms/DestinationNameParser.java     | 13 +++++++++++++
 .../apache/camel/component/sjms/producer/InOutProducer.java | 11 ++++++++---
 4 files changed, 24 insertions(+), 4 deletions(-)

diff --git a/components/camel-sjms/src/main/docs/sjms-component.adoc 
b/components/camel-sjms/src/main/docs/sjms-component.adoc
index 6e9ad12..bff4665 100644
--- a/components/camel-sjms/src/main/docs/sjms-component.adoc
+++ b/components/camel-sjms/src/main/docs/sjms-component.adoc
@@ -150,7 +150,7 @@ with the following path and query parameters:
 | *exceptionHandler* (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
option is not in use. By default the consumer will deal with exceptions, that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. |  | ExchangePattern
 | *messageSelector* (consumer) | Sets the JMS Message selector syntax. |  | 
String
-| *namedReplyTo* (producer) | Sets the reply to destination name used for 
InOut producer endpoints. |  | String
+| *namedReplyTo* (producer) | Sets the reply to destination name used for 
InOut producer endpoints. The type of the reply to destination can be 
determined by the starting prefix (topic: or queue:) in its name. |  | String
 | *persistent* (producer) | Flag used to enable/disable message persistence. | 
true | boolean
 | *producerCount* (producer) | Sets the number of producers used for this 
endpoint. | 1 | int
 | *ttl* (producer) | Flag used to adjust the Time To Live value of produced 
messages. | -1 | long
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index c73ebb0..faa01d0 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -563,6 +563,8 @@ public class SjmsEndpoint extends DefaultEndpoint 
implements AsyncEndpoint, Mult
 
     /**
      * Sets the reply to destination name used for InOut producer endpoints.
+     * The type of the reply to destination can be determined by the starting 
+     * prefix (topic: or queue:) in its name. 
      */
     public void setNamedReplyTo(String namedReplyTo) {
         this.namedReplyTo = namedReplyTo;
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
index 41eab2d..095d1c9 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
@@ -27,6 +27,19 @@ public class DestinationNameParser {
         }
         return destinationName.startsWith("topic:");
     }
+    
+    public boolean isNamedReplyToTopic(String namedReplyTo, boolean 
isDestinationTopic) {
+        if (namedReplyTo == null) {
+            throw new IllegalArgumentException("namedReplyTo is null");
+        }
+        if (namedReplyTo.startsWith("topic:")) {
+            return true;
+        } else if (namedReplyTo.startsWith("queue:")) {
+            return false;
+        } else {
+            return isDestinationTopic;
+        }
+    }
 
     public String getShortName(String destinationName) {
         if (destinationName == null) {
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
index 1be3630..53be09e 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
@@ -38,6 +38,7 @@ import org.apache.camel.component.sjms.SjmsEndpoint;
 import org.apache.camel.component.sjms.SjmsMessage;
 import org.apache.camel.component.sjms.SjmsProducer;
 import org.apache.camel.component.sjms.jms.ConnectionResource;
+import org.apache.camel.component.sjms.jms.DestinationNameParser;
 import org.apache.camel.component.sjms.jms.JmsConstants;
 import org.apache.camel.component.sjms.jms.JmsMessageHelper;
 import org.apache.camel.spi.UuidGenerator;
@@ -87,12 +88,16 @@ public class InOutProducer extends SjmsProducer {
                 }
 
                 Destination replyToDestination;
+                boolean isReplyToTopic = false;
                 if (ObjectHelper.isEmpty(getNamedReplyTo())) {
-                    replyToDestination = 
getEndpoint().getDestinationCreationStrategy().createTemporaryDestination(session,
 isTopic());
+                    isReplyToTopic = isTopic();
+                    replyToDestination = 
getEndpoint().getDestinationCreationStrategy().createTemporaryDestination(session,
 isReplyToTopic);
                 } else {
-                    replyToDestination = 
getEndpoint().getDestinationCreationStrategy().createDestination(session, 
getNamedReplyTo(), isTopic());
+                    DestinationNameParser parser = new DestinationNameParser();
+                    isReplyToTopic = 
parser.isNamedReplyToTopic(getNamedReplyTo(), isTopic());
+                    replyToDestination = 
getEndpoint().getDestinationCreationStrategy().createDestination(session, 
getNamedReplyTo(), isReplyToTopic);
                 }
-                MessageConsumer messageConsumer = 
getEndpoint().getJmsObjectFactory().createMessageConsumer(session, 
replyToDestination, null, isTopic(), null, true, false, false);
+                MessageConsumer messageConsumer = 
getEndpoint().getJmsObjectFactory().createMessageConsumer(session, 
replyToDestination, null, isReplyToTopic, null, true, false, false);
                 messageConsumer.setMessageListener(new MessageListener() {
                     @Override
                     public void onMessage(final Message message) {

Reply via email to