This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 835c7c93fa62c941ff36946dfaeb9558dddb28c1
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Dec 4 08:23:37 2019 +0100

    CAMEL-14252: camel-nats - Add support for reply-to in consumer
---
 .../camel-nats/src/main/docs/nats-component.adoc   | 20 ++++++++++++++----
 .../camel/component/nats/NatsConfiguration.java    | 13 ++++++++++++
 .../apache/camel/component/nats/NatsConsumer.java  | 12 +++++++++++
 ...sumerTest.java => NatsConsumerReplyToTest.java} | 21 +++++++++++++------
 .../camel/component/nats/NatsConsumerTest.java     |  2 +-
 .../endpoint/dsl/NatsEndpointBuilderFactory.java   | 24 ++++++++++++++++++++++
 6 files changed, 81 insertions(+), 11 deletions(-)

diff --git a/components/camel-nats/src/main/docs/nats-component.adoc 
b/components/camel-nats/src/main/docs/nats-component.adoc
index 7a2f06b..ff05266 100644
--- a/components/camel-nats/src/main/docs/nats-component.adoc
+++ b/components/camel-nats/src/main/docs/nats-component.adoc
@@ -72,7 +72,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (27 parameters):
+=== Query Parameters (28 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -97,6 +97,7 @@ with the following path and query parameters:
 | *maxMessages* (consumer) | Stop receiving messages from a topic we are 
subscribing to after maxMessages |  | String
 | *poolSize* (consumer) | Consumer pool size | 10 | int
 | *queueName* (consumer) | The Queue name if we are using nats for a queue 
configuration |  | String
+| *replyToDisabled* (consumer) | Can be used to turn off sending back reply 
message in the consumer. | false | boolean
 | *exceptionHandler* (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
option is not in use. By default the consumer will deal with exceptions, that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. |  | ExchangePattern
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy 
(on the first message). By starting lazy you can use this to allow CamelContext 
and routes to startup in situations where a producer may otherwise fail during 
starting and cause the route to fail being started. By deferring this startup 
to be lazy then the startup failure can be handled during routing messages via 
Camel's routing error handlers. Beware that when the first message is processed 
then creating and [...]
@@ -147,10 +148,21 @@ The component supports 5 options, which are listed below.
 [width="100%",options="header"]
 |=======================================================================
 |Name |Type |Description
-
-|CamelNatsMessageTimestamp |long |The timestamp of a consumed message.
+| CamelNatsSID | String | The SID of a consumed message.
+| CamelNatsReplyTo | String | The ReplyTo of a consumed message (may be null).
+| CamelNatsSubject | String | The Subject of a consumed message.
+| CamelNatsQueueName | String | The Queue name of a consumed message (may be 
null).
+| CamelNatsMessageTimestamp | long | The timestamp of a consumed message.
 |=======================================================================
- 
+
+== Request/Reply support
+The producer only supports publishing (sending) messages.
+The producer does not support request/reply where it can wait for an expected 
reply message.
+
+The consumer will when routing the message is complete, send back the message 
as reply-message if required.
+
+== Examples
+
 *Producer example:*
 
 [source,java]
diff --git 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
index d046a8f..e053b96 100644
--- 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
+++ 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
@@ -66,6 +66,8 @@ public class NatsConfiguration {
     @UriParam(label = "consumer")
     private String queueName;
     @UriParam(label = "consumer")
+    private boolean replyToDisabled;
+    @UriParam(label = "consumer")
     private String maxMessages;
     @UriParam(label = "consumer", defaultValue = "10")
     private int poolSize = 10;
@@ -258,6 +260,17 @@ public class NatsConfiguration {
         this.queueName = queueName;
     }
 
+    public boolean isReplyToDisabled() {
+        return replyToDisabled;
+    }
+
+    /**
+     * Can be used to turn off sending back reply message in the consumer.
+     */
+    public void setReplyToDisabled(boolean replyToDisabled) {
+        this.replyToDisabled = replyToDisabled;
+    }
+
     /**
      * Stop receiving messages from a topic we are subscribing to after
      * maxMessages
diff --git 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index bbcd715..5c905aa 100644
--- 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++ 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -155,6 +155,18 @@ public class NatsConsumer extends DefaultConsumer {
                 } catch (Exception e) {
                     getExceptionHandler().handleException("Error during 
processing", exchange, e);
                 }
+
+                // is there a reply?
+                if (!configuration.isReplyToDisabled()
+                        && msg.getReplyTo() != null && msg.getConnection() != 
null) {
+                    Connection con = msg.getConnection();
+                    byte[] data = exchange.getMessage().getBody(byte[].class);
+                    if (data != null) {
+                        log.debug("Publishing replyTo: {} message", 
msg.getReplyTo());
+                        con.publish(msg.getReplyTo(), data);
+                    }
+                }
+
             }
         }
     }
diff --git 
a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
 
b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerReplyToTest.java
similarity index 66%
copy from 
components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
copy to 
components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerReplyToTest.java
index 6267648..33a5d5b 100644
--- 
a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
+++ 
b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerReplyToTest.java
@@ -21,19 +21,23 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.Test;
 
-public class NatsConsumerTest extends NatsTestSupport {
+public class NatsConsumerReplyToTest extends NatsTestSupport {
 
     @EndpointInject("mock:result")
     protected MockEndpoint mockResultEndpoint;
 
     @Test
-    public void testConsumer() throws Exception {
-        mockResultEndpoint.expectedBodiesReceived("Hello World");
+    public void testReplyTo() throws Exception {
+        mockResultEndpoint.expectedBodiesReceived("World");
         mockResultEndpoint.expectedHeaderReceived(NatsConstants.NATS_SUBJECT, 
"test");
 
-        template.requestBody("direct:send", "Hello World");
+        template.sendBody("direct:send", "World");
 
         mockResultEndpoint.assertIsSatisfied();
+
+        // grab reply message from the reply queue
+        String out = consumer.receiveBody("nats://"  + getNatsUrl() + 
"?topic=myReplyQueue", 5000, String.class);
+        assertEquals("Bye World", out);
     }
 
     @Override
@@ -41,8 +45,13 @@ public class NatsConsumerTest extends NatsTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:send").to("nats://"  + getNatsUrl() + 
"?topic=test&flushConnection=true");
-                from("nats://" + getNatsUrl() + 
"?topic=test&flushConnection=true").to(mockResultEndpoint);
+                from("direct:send")
+                        .to("nats://"  + getNatsUrl() + 
"?topic=test&replySubject=myReplyQueue&flushConnection=true");
+
+                from("nats://" + getNatsUrl() + 
"?topic=test&flushConnection=true")
+                        .to(mockResultEndpoint)
+                        .convertBodyTo(String.class)
+                        .setBody().simple("Bye ${body}");
             }
         };
     }
diff --git 
a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
 
b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
index 6267648..85676a4 100644
--- 
a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
+++ 
b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
@@ -31,7 +31,7 @@ public class NatsConsumerTest extends NatsTestSupport {
         mockResultEndpoint.expectedBodiesReceived("Hello World");
         mockResultEndpoint.expectedHeaderReceived(NatsConstants.NATS_SUBJECT, 
"test");
 
-        template.requestBody("direct:send", "Hello World");
+        template.sendBody("direct:send", "Hello World");
 
         mockResultEndpoint.assertIsSatisfied();
     }
diff --git 
a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
 
b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
index 9e27ffc..b47029a 100644
--- 
a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
+++ 
b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
@@ -468,6 +468,30 @@ public interface NatsEndpointBuilderFactory {
             return this;
         }
         /**
+         * Can be used to turn off sending back reply message in the consumer.
+         * 
+         * The option is a: <code>boolean</code> type.
+         * 
+         * Group: consumer
+         */
+        default NatsEndpointConsumerBuilder replyToDisabled(
+                boolean replyToDisabled) {
+            doSetProperty("replyToDisabled", replyToDisabled);
+            return this;
+        }
+        /**
+         * Can be used to turn off sending back reply message in the consumer.
+         * 
+         * The option will be converted to a <code>boolean</code> type.
+         * 
+         * Group: consumer
+         */
+        default NatsEndpointConsumerBuilder replyToDisabled(
+                String replyToDisabled) {
+            doSetProperty("replyToDisabled", replyToDisabled);
+            return this;
+        }
+        /**
          * Set secure option indicating TLS is required.
          * 
          * The option is a: <code>boolean</code> type.

Reply via email to