This is an automated email from the ASF dual-hosted git repository.
oscerd pushed a commit to branch camel-4.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.18.x by this push:
new 117404f3efb2 CAMEL-23419: camel-nats - Fix JetStream consumer pull
subscription mode (#23087)
117404f3efb2 is described below
commit 117404f3efb2e478b69dda1c650672f104acf3bd
Author: Andrea Cosentino <[email protected]>
AuthorDate: Fri May 8 15:17:42 2026 +0200
CAMEL-23419: camel-nats - Fix JetStream consumer pull subscription mode
(#23087)
Backport of #23084 onto camel-4.18.x.
The JetStream consumer was setting up a pull subscription using the
dispatcher-based subscribe overload but never issued any pull/fetch
requests, so no messages were ever delivered in pull mode (the default).
Switching to push mode via pullSubscription=false was the only workaround.
Fix: use the synchronous pull subscription API and run a fetch loop on the
consumer worker thread. Two new endpoint options control the loop:
- pullBatchSize (default 10): max messages per fetch request
- pullFetchTimeout (default 1000 ms): max wait per fetch request
Also unsubscribe the JetStream subscription on consumer stop.
Adds an integration test covering the pull subscription path.
Upgrade-guide note redirected from 4_21 (not present on 4.18.x) to 4_18.
Closes #23087
---
.../org/apache/camel/catalog/components/nats.json | 70 ++++++++++++----------
.../component/nats/NatsComponentConfigurer.java | 12 ++++
.../component/nats/NatsEndpointConfigurer.java | 12 ++++
.../component/nats/NatsEndpointUriFactory.java | 4 +-
.../org/apache/camel/component/nats/nats.json | 70 ++++++++++++----------
.../camel/component/nats/NatsConfiguration.java | 28 +++++++++
.../apache/camel/component/nats/NatsConsumer.java | 57 ++++++++++++++++--
.../jetstream/NatsJetstreamConsumerPullIT.java | 63 +++++++++++++++++++
.../ROOT/pages/camel-4x-upgrade-guide-4_18.adoc | 12 ++++
.../component/dsl/NatsComponentBuilderFactory.java | 40 +++++++++++++
.../endpoint/dsl/NatsEndpointBuilderFactory.java | 68 +++++++++++++++++++++
11 files changed, 365 insertions(+), 71 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json
index 42f9b7534fe6..85d9a36e95ed 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json
@@ -49,22 +49,24 @@
"maxMessages": { "index": 22, "kind": "property", "displayName": "Max
Messages", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Stop receiving messages from a topic we are
subscribing to after maxMessages" },
"nackWait": { "index": 23, "kind": "property", "displayName": "Nack Wait",
"group": "consumer", "label": "consumer", "required": false, "type": "integer",
"javaType": "long", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": 5000, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "For negative acknowledgements (NAK),
redelivery is delayed by 5 seconds (default). Setting this to 0 or [...]
"poolSize": { "index": 24, "kind": "property", "displayName": "Pool Size",
"group": "consumer", "label": "consumer", "required": false, "type": "integer",
"javaType": "int", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": 10, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Consumer thread pool size (default is 10)" },
- "pullSubscription": { "index": 25, "kind": "property", "displayName":
"Pull Subscription", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Sets the consumer
subscription type for JetStream. Set to true to use a Pull Subscri [...]
- "queueName": { "index": 26, "kind": "property", "displayName": "Queue
Name", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "The Queue name if we are using nats for a
queue configuration" },
- "replyToDisabled": { "index": 27, "kind": "property", "displayName":
"Reply To Disabled", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Can be used to turn off
sending back reply message in the consumer." },
- "consumerConfiguration": { "index": 28, "kind": "property", "displayName":
"Consumer Configuration", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets a custom ConsumerConfiguration object f
[...]
- "lazyStartProducer": { "index": 29, "kind": "property", "displayName":
"Lazy Start Producer", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false, "description":
"Whether the producer should be started lazy (on the first message). By
starting lazy you can use this to allow CamelContext and routes to startup in
situations where a producer may otherwise fai [...]
- "replySubject": { "index": 30, "kind": "property", "displayName": "Reply
Subject", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "the subject to which subscribers should send
response" },
- "requestTimeout": { "index": 31, "kind": "property", "displayName":
"Request Timeout", "group": "producer", "label": "producer", "required": false,
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 20000, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Request timeout in milliseconds" },
- "autowiredEnabled": { "index": 32, "kind": "property", "displayName":
"Autowired Enabled", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true, "description":
"Whether autowiring is enabled. This is used for automatic autowiring options
(the option must be marked as autowired) by looking up in the registry to find
if there is a single instance of matching [...]
- "connection": { "index": 33, "kind": "property", "displayName":
"Connection", "group": "advanced", "label": "advanced", "required": false,
"type": "object", "javaType": "io.nats.client.Connection", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Reference an already instantiated connection
to Nats server" },
- "headerFilterStrategy": { "index": 34, "kind": "property", "displayName":
"Header Filter Strategy", "group": "advanced", "label": "advanced", "required":
false, "type": "object", "javaType":
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To use a custom header filter strategy." },
- "jetstreamAsync": { "index": 35, "kind": "property", "displayName":
"Jetstream Async", "group": "advanced", "label": "advanced", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": true, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets whether to operate JetStream requests
asynchronously." },
- "traceConnection": { "index": 36, "kind": "property", "displayName":
"Trace Connection", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Whether or not
connection trace messages should be printed to standard out for fine g [...]
- "credentialsFilePath": { "index": 37, "kind": "property", "displayName":
"Credentials File Path", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "If we use useCredentialsFile to true we'll
need to set the credentialsFilePath option. It c [...]
- "secure": { "index": 38, "kind": "property", "displayName": "Secure",
"group": "security", "label": "security", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Set secure option indicating TLS is required"
},
- "sslContextParameters": { "index": 39, "kind": "property", "displayName":
"Ssl Context Parameters", "group": "security", "label": "security", "required":
false, "type": "object", "javaType":
"org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To configure security using
SSLContextParameters" },
- "useGlobalSslContextParameters": { "index": 40, "kind": "property",
"displayName": "Use Global Ssl Context Parameters", "group": "security",
"label": "security", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Enable usage of global SSL context
parameters." }
+ "pullBatchSize": { "index": 25, "kind": "property", "displayName": "Pull
Batch Size", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 10, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Maximum number of messages to fetch per pull
request when using a JetStream Pull Subscription. [...]
+ "pullFetchTimeout": { "index": 26, "kind": "property", "displayName":
"Pull Fetch Timeout", "group": "consumer", "label": "consumer", "required":
false, "type": "integer", "javaType": "long", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": 1000, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Maximum time (in milliseconds) to wait for a
batch of messages to be available on the [...]
+ "pullSubscription": { "index": 27, "kind": "property", "displayName":
"Pull Subscription", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Sets the consumer
subscription type for JetStream. Set to true to use a Pull Subscri [...]
+ "queueName": { "index": 28, "kind": "property", "displayName": "Queue
Name", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "The Queue name if we are using nats for a
queue configuration" },
+ "replyToDisabled": { "index": 29, "kind": "property", "displayName":
"Reply To Disabled", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Can be used to turn off
sending back reply message in the consumer." },
+ "consumerConfiguration": { "index": 30, "kind": "property", "displayName":
"Consumer Configuration", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets a custom ConsumerConfiguration object f
[...]
+ "lazyStartProducer": { "index": 31, "kind": "property", "displayName":
"Lazy Start Producer", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false, "description":
"Whether the producer should be started lazy (on the first message). By
starting lazy you can use this to allow CamelContext and routes to startup in
situations where a producer may otherwise fai [...]
+ "replySubject": { "index": 32, "kind": "property", "displayName": "Reply
Subject", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "the subject to which subscribers should send
response" },
+ "requestTimeout": { "index": 33, "kind": "property", "displayName":
"Request Timeout", "group": "producer", "label": "producer", "required": false,
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 20000, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Request timeout in milliseconds" },
+ "autowiredEnabled": { "index": 34, "kind": "property", "displayName":
"Autowired Enabled", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true, "description":
"Whether autowiring is enabled. This is used for automatic autowiring options
(the option must be marked as autowired) by looking up in the registry to find
if there is a single instance of matching [...]
+ "connection": { "index": 35, "kind": "property", "displayName":
"Connection", "group": "advanced", "label": "advanced", "required": false,
"type": "object", "javaType": "io.nats.client.Connection", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Reference an already instantiated connection
to Nats server" },
+ "headerFilterStrategy": { "index": 36, "kind": "property", "displayName":
"Header Filter Strategy", "group": "advanced", "label": "advanced", "required":
false, "type": "object", "javaType":
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To use a custom header filter strategy." },
+ "jetstreamAsync": { "index": 37, "kind": "property", "displayName":
"Jetstream Async", "group": "advanced", "label": "advanced", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": true, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets whether to operate JetStream requests
asynchronously." },
+ "traceConnection": { "index": 38, "kind": "property", "displayName":
"Trace Connection", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Whether or not
connection trace messages should be printed to standard out for fine g [...]
+ "credentialsFilePath": { "index": 39, "kind": "property", "displayName":
"Credentials File Path", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "If we use useCredentialsFile to true we'll
need to set the credentialsFilePath option. It c [...]
+ "secure": { "index": 40, "kind": "property", "displayName": "Secure",
"group": "security", "label": "security", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Set secure option indicating TLS is required"
},
+ "sslContextParameters": { "index": 41, "kind": "property", "displayName":
"Ssl Context Parameters", "group": "security", "label": "security", "required":
false, "type": "object", "javaType":
"org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To configure security using
SSLContextParameters" },
+ "useGlobalSslContextParameters": { "index": 42, "kind": "property",
"displayName": "Use Global Ssl Context Parameters", "group": "security",
"label": "security", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Enable usage of global SSL context
parameters." }
},
"headers": {
"CamelNatsMessageTimestamp": { "index": 0, "kind": "header",
"displayName": "", "group": "common", "label": "", "required": false,
"javaType": "long", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The timestamp of a consumed message.",
"constantName":
"org.apache.camel.component.nats.NatsConstants#NATS_MESSAGE_TIMESTAMP" },
@@ -101,22 +103,24 @@
"maxMessages": { "index": 21, "kind": "parameter", "displayName": "Max
Messages", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Stop receiving messages from a topic we are
subscribing to after maxMessages" },
"nackWait": { "index": 22, "kind": "parameter", "displayName": "Nack
Wait", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 5000, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "For negative acknowledgements (NAK),
redelivery is delayed by 5 seconds (default). Setting this to 0 o [...]
"poolSize": { "index": 23, "kind": "parameter", "displayName": "Pool
Size", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 10, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Consumer thread pool size (default is 10)" },
- "pullSubscription": { "index": 24, "kind": "parameter", "displayName":
"Pull Subscription", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Sets the consumer
subscription type for JetStream. Set to true to use a Pull Subscr [...]
- "queueName": { "index": 25, "kind": "parameter", "displayName": "Queue
Name", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "The Queue name if we are using nats for a
queue configuration" },
- "replyToDisabled": { "index": 26, "kind": "parameter", "displayName":
"Reply To Disabled", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Can be used to turn off
sending back reply message in the consumer." },
- "bridgeErrorHandler": { "index": 27, "kind": "parameter", "displayName":
"Bridge Error Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions (if possible) occurred
while the Camel consumer is trying to pickup incoming [...]
- "consumerConfiguration": { "index": 28, "kind": "parameter",
"displayName": "Consumer Configuration", "group": "consumer (advanced)",
"label": "consumer,advanced", "required": false, "type": "object", "javaType":
"io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets a custom ConsumerConfiguration object
[...]
- "exceptionHandler": { "index": 29, "kind": "parameter", "displayName":
"Exception Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "autowired": false, "secret": false, "description": "To
let the consumer use a custom ExceptionHandler. Notice if the option
bridgeErrorHandler is enabled then this option is not in use. By de [...]
- "exchangePattern": { "index": 30, "kind": "parameter", "displayName":
"Exchange Pattern", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "enum", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ],
"deprecated": false, "autowired": false, "secret": false, "description": "Sets
the exchange pattern when the consumer creates an exchange." },
- "replySubject": { "index": 31, "kind": "parameter", "displayName": "Reply
Subject", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "the subject to which subscribers should send
response" },
- "requestTimeout": { "index": 32, "kind": "parameter", "displayName":
"Request Timeout", "group": "producer", "label": "producer", "required": false,
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 20000, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Request timeout in milliseconds" },
- "lazyStartProducer": { "index": 33, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
- "connection": { "index": 34, "kind": "parameter", "displayName":
"Connection", "group": "advanced", "label": "advanced", "required": false,
"type": "object", "javaType": "io.nats.client.Connection", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Reference an already instantiated connection
to Nats server" },
- "headerFilterStrategy": { "index": 35, "kind": "parameter", "displayName":
"Header Filter Strategy", "group": "advanced", "label": "advanced", "required":
false, "type": "object", "javaType":
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To use a custom header filter strategy." },
- "jetstreamAsync": { "index": 36, "kind": "parameter", "displayName":
"Jetstream Async", "group": "advanced", "label": "advanced", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": true, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets whether to operate JetStream requests
asynchronously." },
- "traceConnection": { "index": 37, "kind": "parameter", "displayName":
"Trace Connection", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Whether or not
connection trace messages should be printed to standard out for fine [...]
- "credentialsFilePath": { "index": 38, "kind": "parameter", "displayName":
"Credentials File Path", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "If we use useCredentialsFile to true we'll
need to set the credentialsFilePath option. It [...]
- "secure": { "index": 39, "kind": "parameter", "displayName": "Secure",
"group": "security", "label": "security", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Set secure option indicating TLS is required"
},
- "sslContextParameters": { "index": 40, "kind": "parameter", "displayName":
"Ssl Context Parameters", "group": "security", "label": "security", "required":
false, "type": "object", "javaType":
"org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To configure security using
SSLContextParameters" }
+ "pullBatchSize": { "index": 24, "kind": "parameter", "displayName": "Pull
Batch Size", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 10, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Maximum number of messages to fetch per pull
request when using a JetStream Pull Subscription. [...]
+ "pullFetchTimeout": { "index": 25, "kind": "parameter", "displayName":
"Pull Fetch Timeout", "group": "consumer", "label": "consumer", "required":
false, "type": "integer", "javaType": "long", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": 1000, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Maximum time (in milliseconds) to wait for a
batch of messages to be available on the [...]
+ "pullSubscription": { "index": 26, "kind": "parameter", "displayName":
"Pull Subscription", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Sets the consumer
subscription type for JetStream. Set to true to use a Pull Subscr [...]
+ "queueName": { "index": 27, "kind": "parameter", "displayName": "Queue
Name", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "The Queue name if we are using nats for a
queue configuration" },
+ "replyToDisabled": { "index": 28, "kind": "parameter", "displayName":
"Reply To Disabled", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Can be used to turn off
sending back reply message in the consumer." },
+ "bridgeErrorHandler": { "index": 29, "kind": "parameter", "displayName":
"Bridge Error Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions (if possible) occurred
while the Camel consumer is trying to pickup incoming [...]
+ "consumerConfiguration": { "index": 30, "kind": "parameter",
"displayName": "Consumer Configuration", "group": "consumer (advanced)",
"label": "consumer,advanced", "required": false, "type": "object", "javaType":
"io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets a custom ConsumerConfiguration object
[...]
+ "exceptionHandler": { "index": 31, "kind": "parameter", "displayName":
"Exception Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "autowired": false, "secret": false, "description": "To
let the consumer use a custom ExceptionHandler. Notice if the option
bridgeErrorHandler is enabled then this option is not in use. By de [...]
+ "exchangePattern": { "index": 32, "kind": "parameter", "displayName":
"Exchange Pattern", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "enum", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ],
"deprecated": false, "autowired": false, "secret": false, "description": "Sets
the exchange pattern when the consumer creates an exchange." },
+ "replySubject": { "index": 33, "kind": "parameter", "displayName": "Reply
Subject", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "the subject to which subscribers should send
response" },
+ "requestTimeout": { "index": 34, "kind": "parameter", "displayName":
"Request Timeout", "group": "producer", "label": "producer", "required": false,
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 20000, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Request timeout in milliseconds" },
+ "lazyStartProducer": { "index": 35, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
+ "connection": { "index": 36, "kind": "parameter", "displayName":
"Connection", "group": "advanced", "label": "advanced", "required": false,
"type": "object", "javaType": "io.nats.client.Connection", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Reference an already instantiated connection
to Nats server" },
+ "headerFilterStrategy": { "index": 37, "kind": "parameter", "displayName":
"Header Filter Strategy", "group": "advanced", "label": "advanced", "required":
false, "type": "object", "javaType":
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To use a custom header filter strategy." },
+ "jetstreamAsync": { "index": 38, "kind": "parameter", "displayName":
"Jetstream Async", "group": "advanced", "label": "advanced", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": true, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets whether to operate JetStream requests
asynchronously." },
+ "traceConnection": { "index": 39, "kind": "parameter", "displayName":
"Trace Connection", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Whether or not
connection trace messages should be printed to standard out for fine [...]
+ "credentialsFilePath": { "index": 40, "kind": "parameter", "displayName":
"Credentials File Path", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "If we use useCredentialsFile to true we'll
need to set the credentialsFilePath option. It [...]
+ "secure": { "index": 41, "kind": "parameter", "displayName": "Secure",
"group": "security", "label": "security", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Set secure option indicating TLS is required"
},
+ "sslContextParameters": { "index": 42, "kind": "parameter", "displayName":
"Ssl Context Parameters", "group": "security", "label": "security", "required":
false, "type": "object", "javaType":
"org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To configure security using
SSLContextParameters" }
}
}
diff --git
a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsComponentConfigurer.java
b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsComponentConfigurer.java
index 96cc901207d3..d0891a119914 100644
---
a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsComponentConfigurer.java
+++
b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsComponentConfigurer.java
@@ -81,6 +81,10 @@ public class NatsComponentConfigurer extends
PropertyConfigurerSupport implement
case "pingInterval":
getOrCreateConfiguration(target).setPingInterval(property(camelContext,
int.class, value)); return true;
case "poolsize":
case "poolSize":
getOrCreateConfiguration(target).setPoolSize(property(camelContext, int.class,
value)); return true;
+ case "pullbatchsize":
+ case "pullBatchSize":
getOrCreateConfiguration(target).setPullBatchSize(property(camelContext,
int.class, value)); return true;
+ case "pullfetchtimeout":
+ case "pullFetchTimeout":
getOrCreateConfiguration(target).setPullFetchTimeout(property(camelContext,
long.class, value)); return true;
case "pullsubscription":
case "pullSubscription":
getOrCreateConfiguration(target).setPullSubscription(property(camelContext,
boolean.class, value)); return true;
case "queuename":
@@ -163,6 +167,10 @@ public class NatsComponentConfigurer extends
PropertyConfigurerSupport implement
case "pingInterval": return int.class;
case "poolsize":
case "poolSize": return int.class;
+ case "pullbatchsize":
+ case "pullBatchSize": return int.class;
+ case "pullfetchtimeout":
+ case "pullFetchTimeout": return long.class;
case "pullsubscription":
case "pullSubscription": return boolean.class;
case "queuename":
@@ -246,6 +254,10 @@ public class NatsComponentConfigurer extends
PropertyConfigurerSupport implement
case "pingInterval": return
getOrCreateConfiguration(target).getPingInterval();
case "poolsize":
case "poolSize": return getOrCreateConfiguration(target).getPoolSize();
+ case "pullbatchsize":
+ case "pullBatchSize": return
getOrCreateConfiguration(target).getPullBatchSize();
+ case "pullfetchtimeout":
+ case "pullFetchTimeout": return
getOrCreateConfiguration(target).getPullFetchTimeout();
case "pullsubscription":
case "pullSubscription": return
getOrCreateConfiguration(target).isPullSubscription();
case "queuename":
diff --git
a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java
b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java
index 3436de25552b..923781a8bc8a 100644
---
a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java
+++
b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java
@@ -75,6 +75,10 @@ public class NatsEndpointConfigurer extends
PropertyConfigurerSupport implements
case "pingInterval":
target.getConfiguration().setPingInterval(property(camelContext, int.class,
value)); return true;
case "poolsize":
case "poolSize":
target.getConfiguration().setPoolSize(property(camelContext, int.class,
value)); return true;
+ case "pullbatchsize":
+ case "pullBatchSize":
target.getConfiguration().setPullBatchSize(property(camelContext, int.class,
value)); return true;
+ case "pullfetchtimeout":
+ case "pullFetchTimeout":
target.getConfiguration().setPullFetchTimeout(property(camelContext,
long.class, value)); return true;
case "pullsubscription":
case "pullSubscription":
target.getConfiguration().setPullSubscription(property(camelContext,
boolean.class, value)); return true;
case "queuename":
@@ -156,6 +160,10 @@ public class NatsEndpointConfigurer extends
PropertyConfigurerSupport implements
case "pingInterval": return int.class;
case "poolsize":
case "poolSize": return int.class;
+ case "pullbatchsize":
+ case "pullBatchSize": return int.class;
+ case "pullfetchtimeout":
+ case "pullFetchTimeout": return long.class;
case "pullsubscription":
case "pullSubscription": return boolean.class;
case "queuename":
@@ -238,6 +246,10 @@ public class NatsEndpointConfigurer extends
PropertyConfigurerSupport implements
case "pingInterval": return
target.getConfiguration().getPingInterval();
case "poolsize":
case "poolSize": return target.getConfiguration().getPoolSize();
+ case "pullbatchsize":
+ case "pullBatchSize": return
target.getConfiguration().getPullBatchSize();
+ case "pullfetchtimeout":
+ case "pullFetchTimeout": return
target.getConfiguration().getPullFetchTimeout();
case "pullsubscription":
case "pullSubscription": return
target.getConfiguration().isPullSubscription();
case "queuename":
diff --git
a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java
b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java
index 9252e5d790b3..542551087298 100644
---
a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java
+++
b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java
@@ -23,7 +23,7 @@ public class NatsEndpointUriFactory extends
org.apache.camel.support.component.E
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Map<String, String> MULTI_VALUE_PREFIXES;
static {
- Set<String> props = new HashSet<>(41);
+ Set<String> props = new HashSet<>(43);
props.add("ackPolicy");
props.add("ackWait");
props.add("bridgeErrorHandler");
@@ -51,6 +51,8 @@ public class NatsEndpointUriFactory extends
org.apache.camel.support.component.E
props.add("pedantic");
props.add("pingInterval");
props.add("poolSize");
+ props.add("pullBatchSize");
+ props.add("pullFetchTimeout");
props.add("pullSubscription");
props.add("queueName");
props.add("reconnect");
diff --git
a/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json
b/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json
index 42f9b7534fe6..85d9a36e95ed 100644
---
a/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json
+++
b/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json
@@ -49,22 +49,24 @@
"maxMessages": { "index": 22, "kind": "property", "displayName": "Max
Messages", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Stop receiving messages from a topic we are
subscribing to after maxMessages" },
"nackWait": { "index": 23, "kind": "property", "displayName": "Nack Wait",
"group": "consumer", "label": "consumer", "required": false, "type": "integer",
"javaType": "long", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": 5000, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "For negative acknowledgements (NAK),
redelivery is delayed by 5 seconds (default). Setting this to 0 or [...]
"poolSize": { "index": 24, "kind": "property", "displayName": "Pool Size",
"group": "consumer", "label": "consumer", "required": false, "type": "integer",
"javaType": "int", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": 10, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Consumer thread pool size (default is 10)" },
- "pullSubscription": { "index": 25, "kind": "property", "displayName":
"Pull Subscription", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Sets the consumer
subscription type for JetStream. Set to true to use a Pull Subscri [...]
- "queueName": { "index": 26, "kind": "property", "displayName": "Queue
Name", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "The Queue name if we are using nats for a
queue configuration" },
- "replyToDisabled": { "index": 27, "kind": "property", "displayName":
"Reply To Disabled", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Can be used to turn off
sending back reply message in the consumer." },
- "consumerConfiguration": { "index": 28, "kind": "property", "displayName":
"Consumer Configuration", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets a custom ConsumerConfiguration object f
[...]
- "lazyStartProducer": { "index": 29, "kind": "property", "displayName":
"Lazy Start Producer", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false, "description":
"Whether the producer should be started lazy (on the first message). By
starting lazy you can use this to allow CamelContext and routes to startup in
situations where a producer may otherwise fai [...]
- "replySubject": { "index": 30, "kind": "property", "displayName": "Reply
Subject", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "the subject to which subscribers should send
response" },
- "requestTimeout": { "index": 31, "kind": "property", "displayName":
"Request Timeout", "group": "producer", "label": "producer", "required": false,
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 20000, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Request timeout in milliseconds" },
- "autowiredEnabled": { "index": 32, "kind": "property", "displayName":
"Autowired Enabled", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true, "description":
"Whether autowiring is enabled. This is used for automatic autowiring options
(the option must be marked as autowired) by looking up in the registry to find
if there is a single instance of matching [...]
- "connection": { "index": 33, "kind": "property", "displayName":
"Connection", "group": "advanced", "label": "advanced", "required": false,
"type": "object", "javaType": "io.nats.client.Connection", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Reference an already instantiated connection
to Nats server" },
- "headerFilterStrategy": { "index": 34, "kind": "property", "displayName":
"Header Filter Strategy", "group": "advanced", "label": "advanced", "required":
false, "type": "object", "javaType":
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To use a custom header filter strategy." },
- "jetstreamAsync": { "index": 35, "kind": "property", "displayName":
"Jetstream Async", "group": "advanced", "label": "advanced", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": true, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets whether to operate JetStream requests
asynchronously." },
- "traceConnection": { "index": 36, "kind": "property", "displayName":
"Trace Connection", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Whether or not
connection trace messages should be printed to standard out for fine g [...]
- "credentialsFilePath": { "index": 37, "kind": "property", "displayName":
"Credentials File Path", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "If we use useCredentialsFile to true we'll
need to set the credentialsFilePath option. It c [...]
- "secure": { "index": 38, "kind": "property", "displayName": "Secure",
"group": "security", "label": "security", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Set secure option indicating TLS is required"
},
- "sslContextParameters": { "index": 39, "kind": "property", "displayName":
"Ssl Context Parameters", "group": "security", "label": "security", "required":
false, "type": "object", "javaType":
"org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To configure security using
SSLContextParameters" },
- "useGlobalSslContextParameters": { "index": 40, "kind": "property",
"displayName": "Use Global Ssl Context Parameters", "group": "security",
"label": "security", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Enable usage of global SSL context
parameters." }
+ "pullBatchSize": { "index": 25, "kind": "property", "displayName": "Pull
Batch Size", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 10, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Maximum number of messages to fetch per pull
request when using a JetStream Pull Subscription. [...]
+ "pullFetchTimeout": { "index": 26, "kind": "property", "displayName":
"Pull Fetch Timeout", "group": "consumer", "label": "consumer", "required":
false, "type": "integer", "javaType": "long", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": 1000, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Maximum time (in milliseconds) to wait for a
batch of messages to be available on the [...]
+ "pullSubscription": { "index": 27, "kind": "property", "displayName":
"Pull Subscription", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Sets the consumer
subscription type for JetStream. Set to true to use a Pull Subscri [...]
+ "queueName": { "index": 28, "kind": "property", "displayName": "Queue
Name", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "The Queue name if we are using nats for a
queue configuration" },
+ "replyToDisabled": { "index": 29, "kind": "property", "displayName":
"Reply To Disabled", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Can be used to turn off
sending back reply message in the consumer." },
+ "consumerConfiguration": { "index": 30, "kind": "property", "displayName":
"Consumer Configuration", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets a custom ConsumerConfiguration object f
[...]
+ "lazyStartProducer": { "index": 31, "kind": "property", "displayName":
"Lazy Start Producer", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false, "description":
"Whether the producer should be started lazy (on the first message). By
starting lazy you can use this to allow CamelContext and routes to startup in
situations where a producer may otherwise fai [...]
+ "replySubject": { "index": 32, "kind": "property", "displayName": "Reply
Subject", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "the subject to which subscribers should send
response" },
+ "requestTimeout": { "index": 33, "kind": "property", "displayName":
"Request Timeout", "group": "producer", "label": "producer", "required": false,
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 20000, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Request timeout in milliseconds" },
+ "autowiredEnabled": { "index": 34, "kind": "property", "displayName":
"Autowired Enabled", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true, "description":
"Whether autowiring is enabled. This is used for automatic autowiring options
(the option must be marked as autowired) by looking up in the registry to find
if there is a single instance of matching [...]
+ "connection": { "index": 35, "kind": "property", "displayName":
"Connection", "group": "advanced", "label": "advanced", "required": false,
"type": "object", "javaType": "io.nats.client.Connection", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Reference an already instantiated connection
to Nats server" },
+ "headerFilterStrategy": { "index": 36, "kind": "property", "displayName":
"Header Filter Strategy", "group": "advanced", "label": "advanced", "required":
false, "type": "object", "javaType":
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To use a custom header filter strategy." },
+ "jetstreamAsync": { "index": 37, "kind": "property", "displayName":
"Jetstream Async", "group": "advanced", "label": "advanced", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": true, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets whether to operate JetStream requests
asynchronously." },
+ "traceConnection": { "index": 38, "kind": "property", "displayName":
"Trace Connection", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Whether or not
connection trace messages should be printed to standard out for fine g [...]
+ "credentialsFilePath": { "index": 39, "kind": "property", "displayName":
"Credentials File Path", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "If we use useCredentialsFile to true we'll
need to set the credentialsFilePath option. It c [...]
+ "secure": { "index": 40, "kind": "property", "displayName": "Secure",
"group": "security", "label": "security", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Set secure option indicating TLS is required"
},
+ "sslContextParameters": { "index": 41, "kind": "property", "displayName":
"Ssl Context Parameters", "group": "security", "label": "security", "required":
false, "type": "object", "javaType":
"org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To configure security using
SSLContextParameters" },
+ "useGlobalSslContextParameters": { "index": 42, "kind": "property",
"displayName": "Use Global Ssl Context Parameters", "group": "security",
"label": "security", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Enable usage of global SSL context
parameters." }
},
"headers": {
"CamelNatsMessageTimestamp": { "index": 0, "kind": "header",
"displayName": "", "group": "common", "label": "", "required": false,
"javaType": "long", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The timestamp of a consumed message.",
"constantName":
"org.apache.camel.component.nats.NatsConstants#NATS_MESSAGE_TIMESTAMP" },
@@ -101,22 +103,24 @@
"maxMessages": { "index": 21, "kind": "parameter", "displayName": "Max
Messages", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Stop receiving messages from a topic we are
subscribing to after maxMessages" },
"nackWait": { "index": 22, "kind": "parameter", "displayName": "Nack
Wait", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 5000, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "For negative acknowledgements (NAK),
redelivery is delayed by 5 seconds (default). Setting this to 0 o [...]
"poolSize": { "index": 23, "kind": "parameter", "displayName": "Pool
Size", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 10, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Consumer thread pool size (default is 10)" },
- "pullSubscription": { "index": 24, "kind": "parameter", "displayName":
"Pull Subscription", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Sets the consumer
subscription type for JetStream. Set to true to use a Pull Subscr [...]
- "queueName": { "index": 25, "kind": "parameter", "displayName": "Queue
Name", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "The Queue name if we are using nats for a
queue configuration" },
- "replyToDisabled": { "index": 26, "kind": "parameter", "displayName":
"Reply To Disabled", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Can be used to turn off
sending back reply message in the consumer." },
- "bridgeErrorHandler": { "index": 27, "kind": "parameter", "displayName":
"Bridge Error Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions (if possible) occurred
while the Camel consumer is trying to pickup incoming [...]
- "consumerConfiguration": { "index": 28, "kind": "parameter",
"displayName": "Consumer Configuration", "group": "consumer (advanced)",
"label": "consumer,advanced", "required": false, "type": "object", "javaType":
"io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets a custom ConsumerConfiguration object
[...]
- "exceptionHandler": { "index": 29, "kind": "parameter", "displayName":
"Exception Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "autowired": false, "secret": false, "description": "To
let the consumer use a custom ExceptionHandler. Notice if the option
bridgeErrorHandler is enabled then this option is not in use. By de [...]
- "exchangePattern": { "index": 30, "kind": "parameter", "displayName":
"Exchange Pattern", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "enum", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ],
"deprecated": false, "autowired": false, "secret": false, "description": "Sets
the exchange pattern when the consumer creates an exchange." },
- "replySubject": { "index": 31, "kind": "parameter", "displayName": "Reply
Subject", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "the subject to which subscribers should send
response" },
- "requestTimeout": { "index": 32, "kind": "parameter", "displayName":
"Request Timeout", "group": "producer", "label": "producer", "required": false,
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 20000, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Request timeout in milliseconds" },
- "lazyStartProducer": { "index": 33, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
- "connection": { "index": 34, "kind": "parameter", "displayName":
"Connection", "group": "advanced", "label": "advanced", "required": false,
"type": "object", "javaType": "io.nats.client.Connection", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Reference an already instantiated connection
to Nats server" },
- "headerFilterStrategy": { "index": 35, "kind": "parameter", "displayName":
"Header Filter Strategy", "group": "advanced", "label": "advanced", "required":
false, "type": "object", "javaType":
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To use a custom header filter strategy." },
- "jetstreamAsync": { "index": 36, "kind": "parameter", "displayName":
"Jetstream Async", "group": "advanced", "label": "advanced", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": true, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets whether to operate JetStream requests
asynchronously." },
- "traceConnection": { "index": 37, "kind": "parameter", "displayName":
"Trace Connection", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Whether or not
connection trace messages should be printed to standard out for fine [...]
- "credentialsFilePath": { "index": 38, "kind": "parameter", "displayName":
"Credentials File Path", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "If we use useCredentialsFile to true we'll
need to set the credentialsFilePath option. It [...]
- "secure": { "index": 39, "kind": "parameter", "displayName": "Secure",
"group": "security", "label": "security", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Set secure option indicating TLS is required"
},
- "sslContextParameters": { "index": 40, "kind": "parameter", "displayName":
"Ssl Context Parameters", "group": "security", "label": "security", "required":
false, "type": "object", "javaType":
"org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To configure security using
SSLContextParameters" }
+ "pullBatchSize": { "index": 24, "kind": "parameter", "displayName": "Pull
Batch Size", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 10, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Maximum number of messages to fetch per pull
request when using a JetStream Pull Subscription. [...]
+ "pullFetchTimeout": { "index": 25, "kind": "parameter", "displayName":
"Pull Fetch Timeout", "group": "consumer", "label": "consumer", "required":
false, "type": "integer", "javaType": "long", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": 1000, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Maximum time (in milliseconds) to wait for a
batch of messages to be available on the [...]
+ "pullSubscription": { "index": 26, "kind": "parameter", "displayName":
"Pull Subscription", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Sets the consumer
subscription type for JetStream. Set to true to use a Pull Subscr [...]
+ "queueName": { "index": 27, "kind": "parameter", "displayName": "Queue
Name", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "The Queue name if we are using nats for a
queue configuration" },
+ "replyToDisabled": { "index": 28, "kind": "parameter", "displayName":
"Reply To Disabled", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Can be used to turn off
sending back reply message in the consumer." },
+ "bridgeErrorHandler": { "index": 29, "kind": "parameter", "displayName":
"Bridge Error Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions (if possible) occurred
while the Camel consumer is trying to pickup incoming [...]
+ "consumerConfiguration": { "index": 30, "kind": "parameter",
"displayName": "Consumer Configuration", "group": "consumer (advanced)",
"label": "consumer,advanced", "required": false, "type": "object", "javaType":
"io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets a custom ConsumerConfiguration object
[...]
+ "exceptionHandler": { "index": 31, "kind": "parameter", "displayName":
"Exception Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "autowired": false, "secret": false, "description": "To
let the consumer use a custom ExceptionHandler. Notice if the option
bridgeErrorHandler is enabled then this option is not in use. By de [...]
+ "exchangePattern": { "index": 32, "kind": "parameter", "displayName":
"Exchange Pattern", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "enum", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ],
"deprecated": false, "autowired": false, "secret": false, "description": "Sets
the exchange pattern when the consumer creates an exchange." },
+ "replySubject": { "index": 33, "kind": "parameter", "displayName": "Reply
Subject", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "the subject to which subscribers should send
response" },
+ "requestTimeout": { "index": 34, "kind": "parameter", "displayName":
"Request Timeout", "group": "producer", "label": "producer", "required": false,
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 20000, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Request timeout in milliseconds" },
+ "lazyStartProducer": { "index": 35, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
+ "connection": { "index": 36, "kind": "parameter", "displayName":
"Connection", "group": "advanced", "label": "advanced", "required": false,
"type": "object", "javaType": "io.nats.client.Connection", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Reference an already instantiated connection
to Nats server" },
+ "headerFilterStrategy": { "index": 37, "kind": "parameter", "displayName":
"Header Filter Strategy", "group": "advanced", "label": "advanced", "required":
false, "type": "object", "javaType":
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To use a custom header filter strategy." },
+ "jetstreamAsync": { "index": 38, "kind": "parameter", "displayName":
"Jetstream Async", "group": "advanced", "label": "advanced", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": true, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets whether to operate JetStream requests
asynchronously." },
+ "traceConnection": { "index": 39, "kind": "parameter", "displayName":
"Trace Connection", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Whether or not
connection trace messages should be printed to standard out for fine [...]
+ "credentialsFilePath": { "index": 40, "kind": "parameter", "displayName":
"Credentials File Path", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "If we use useCredentialsFile to true we'll
need to set the credentialsFilePath option. It [...]
+ "secure": { "index": 41, "kind": "parameter", "displayName": "Secure",
"group": "security", "label": "security", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Set secure option indicating TLS is required"
},
+ "sslContextParameters": { "index": 42, "kind": "parameter", "displayName":
"Ssl Context Parameters", "group": "security", "label": "security", "required":
false, "type": "object", "javaType":
"org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To configure security using
SSLContextParameters" }
}
}
diff --git
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
index fee6e509fe4b..ea2fa55136a5 100644
---
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
+++
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
@@ -99,6 +99,10 @@ public class NatsConfiguration implements Cloneable {
private ConsumerConfiguration consumerConfiguration;
@UriParam(label = "consumer", defaultValue = "true")
private boolean pullSubscription = true;
+ @UriParam(label = "consumer", defaultValue = "10")
+ private int pullBatchSize = 10;
+ @UriParam(label = "consumer", defaultValue = "1000")
+ private long pullFetchTimeout = 1000;
@UriParam(label = "consumer")
private String durableName;
@UriParam(label = "security")
@@ -568,6 +572,30 @@ public class NatsConfiguration implements Cloneable {
this.pullSubscription = pullSubscription;
}
+ public int getPullBatchSize() {
+ return pullBatchSize;
+ }
+
+ /**
+ * Maximum number of messages to fetch per pull request when using a
JetStream Pull Subscription. Only used when
+ * {@code pullSubscription=true}.
+ */
+ public void setPullBatchSize(int pullBatchSize) {
+ this.pullBatchSize = pullBatchSize;
+ }
+
+ public long getPullFetchTimeout() {
+ return pullFetchTimeout;
+ }
+
+ /**
+ * Maximum time (in milliseconds) to wait for a batch of messages to be
available on the server during a single
+ * fetch when using a JetStream Pull Subscription. Only used when {@code
pullSubscription=true}.
+ */
+ public void setPullFetchTimeout(long pullFetchTimeout) {
+ this.pullFetchTimeout = pullFetchTimeout;
+ }
+
/**
* The name to assign to the JetStream durable consumer.
* <p>
diff --git
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index fcedd2c1f2df..c36d0be1fdcc 100644
---
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.nats;
import java.io.IOException;
import java.time.Duration;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import io.nats.client.Connection;
@@ -25,6 +26,7 @@ import io.nats.client.Connection.Status;
import io.nats.client.Dispatcher;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
+import io.nats.client.JetStreamStatusException;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
@@ -96,6 +98,14 @@ public class NatsConsumer extends DefaultConsumer {
}
}
+ if (ObjectHelper.isNotEmpty(this.jetStreamSubscription) &&
this.jetStreamSubscription.isActive()) {
+ try {
+ this.jetStreamSubscription.unsubscribe();
+ } catch (final Exception e) {
+ this.getExceptionHandler().handleException("Error during
unsubscribing", e);
+ }
+ }
+
LOG.debug("Stopping Nats Consumer");
if (this.executor != null) {
if (this.getEndpoint() != null &&
this.getEndpoint().getCamelContext() != null) {
@@ -201,19 +211,26 @@ public class NatsConsumer extends DefaultConsumer {
}
CamelNatsMessageHandler messageHandler = new
CamelNatsMessageHandler();
- NatsConsumer.this.dispatcher =
this.connection.createDispatcher(messageHandler);
if (this.configuration.isPullSubscription()) {
PullSubscribeOptions pullOptions =
PullSubscribeOptions.builder()
.configuration(cc)
.build();
+ LOG.info(
+ "Subscribing topic: {} using AckPolicy: {} (ackWait:{}
nackWait:{} pullBatchSize:{} pullFetchTimeout:{})",
+ configuration.getTopic(),
configuration.getAckPolicy(), configuration.getAckWait(),
+ configuration.getNackWait(),
configuration.getPullBatchSize(), configuration.getPullFetchTimeout());
+
NatsConsumer.this.jetStreamSubscription =
this.connection.jetStream().subscribe(
NatsConsumer.this.getEndpoint().getConfiguration().getTopic(),
- dispatcher,
- messageHandler,
pullOptions);
+
+ NatsConsumer.this.setActive(true);
+ runPullFetchLoop(NatsConsumer.this.jetStreamSubscription,
messageHandler);
} else {
+ NatsConsumer.this.dispatcher =
this.connection.createDispatcher(messageHandler);
+
PushSubscribeOptions pushOptions =
PushSubscribeOptions.builder()
.configuration(cc)
.deliverGroup(queueName)
@@ -232,9 +249,41 @@ public class NatsConsumer extends DefaultConsumer {
messageHandler,
autoAck,
pushOptions);
+
+ NatsConsumer.this.setActive(true);
}
+ }
- NatsConsumer.this.setActive(true);
+ private void runPullFetchLoop(JetStreamSubscription sub,
CamelNatsMessageHandler messageHandler) {
+ int batchSize = configuration.getPullBatchSize();
+ Duration fetchTimeout =
Duration.ofMillis(configuration.getPullFetchTimeout());
+ while (NatsConsumer.this.isRunAllowed() &&
!Thread.currentThread().isInterrupted()) {
+ try {
+ List<Message> messages = sub.fetch(batchSize,
fetchTimeout);
+ for (Message msg : messages) {
+ if (!NatsConsumer.this.isRunAllowed()) {
+ break;
+ }
+ try {
+ messageHandler.onMessage(msg);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ } catch (JetStreamStatusException e) {
+ // Synchronous pull calls may throw
JetStreamStatusException for transient
+ // statuses such as 409 (consumer deleted/push-based). Log
and continue so the
+ // consumer can recover when the consumer state is
restored.
+
NatsConsumer.this.getExceptionHandler().handleException("JetStream status
during pull fetch", e);
+ } catch (IllegalStateException e) {
+ // Subscription was closed (e.g. consumer is stopping),
exit the loop
+ LOG.debug("Pull subscription closed, stopping fetch loop",
e);
+ return;
+ } catch (Exception e) {
+
NatsConsumer.this.getExceptionHandler().handleException("Error during JetStream
pull fetch", e);
+ }
+ }
}
private void setupStandardNatsConsumer(String topic, String queueName,
Integer maxMessages) {
diff --git
a/components/camel-nats/src/test/java/org/apache/camel/component/nats/jetstream/NatsJetstreamConsumerPullIT.java
b/components/camel-nats/src/test/java/org/apache/camel/component/nats/jetstream/NatsJetstreamConsumerPullIT.java
new file mode 100644
index 000000000000..f1a1b23f00d1
--- /dev/null
+++
b/components/camel-nats/src/test/java/org/apache/camel/component/nats/jetstream/NatsJetstreamConsumerPullIT.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.nats.jetstream;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.nats.NatsConstants;
+import org.apache.camel.component.nats.integration.NatsITSupport;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.Isolated;
+
+@Isolated
+public class NatsJetstreamConsumerPullIT extends NatsITSupport {
+
+ @EndpointInject("mock:result")
+ protected MockEndpoint mockResultEndpoint;
+
+ @Test
+ public void testPullConsumer() throws Exception {
+ mockResultEndpoint.expectedBodiesReceived("Hello Pull");
+ mockResultEndpoint.expectedHeaderReceived(NatsConstants.NATS_SUBJECT,
"mytopic-pull");
+
+ template.sendBody("direct:send", "Hello Pull");
+
+ mockResultEndpoint.setAssertPeriod(5000);
+ MockEndpoint.assertIsSatisfied(context);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ String producerUri
+ =
"nats:mytopic-pull?jetstreamEnabled=true&jetstreamName=mystream-pull&jetstreamAsync=false";
+ String consumerUri
+ =
"nats:mytopic-pull?jetstreamEnabled=true&jetstreamName=mystream-pull&jetstreamAsync=false&durableName=camel-pull&pullSubscription=true&pullFetchTimeout=500";
+
+ from("direct:send")
+ // when running full test suite then send can fail due
to nats server setup/teardown
+
.errorHandler(defaultErrorHandler().maximumRedeliveries(5))
+ .to(producerUri);
+
+ from(consumerUri).to(mockResultEndpoint);
+ }
+ };
+ }
+}
diff --git
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_18.adoc
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_18.adoc
index 0134495045e2..e891799b75ab 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_18.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_18.adoc
@@ -255,3 +255,15 @@ defense-in-depth only. The primary mitigation should be
configured at the messag
* ActiveMQ Artemis: `deserializationAllowList` / `deserializationDenyList`
(see the Artemis docs)
* ActiveMQ Classic: the `org.apache.activemq.SERIALIZABLE_PACKAGES` system
property
+
+=== camel-nats
+
+Fixed the JetStream consumer pull subscription mode (which is the default) so
that messages are
+actually consumed from the server. Previously the consumer would set up a pull
subscription but never
+issue any pull/fetch requests, so no messages were delivered until the user
explicitly switched to
+push mode with `pullSubscription=false`.
+
+Two new endpoint options have been added to control the pull fetch loop:
+
+* `pullBatchSize` (default `10`) — maximum number of messages to fetch per
pull request.
+* `pullFetchTimeout` (default `1000` ms) — maximum time to wait for a batch on
each fetch.
diff --git
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/NatsComponentBuilderFactory.java
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/NatsComponentBuilderFactory.java
index b78c037b7c72..74051188ea21 100644
---
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/NatsComponentBuilderFactory.java
+++
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/NatsComponentBuilderFactory.java
@@ -509,6 +509,44 @@ public interface NatsComponentBuilderFactory {
}
+ /**
+ * Maximum number of messages to fetch per pull request when using a
+ * JetStream Pull Subscription. Only used when {code
+ * pullSubscription=true}.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 10
+ * Group: consumer
+ *
+ * @param pullBatchSize the value to set
+ * @return the dsl builder
+ */
+ default NatsComponentBuilder pullBatchSize(int pullBatchSize) {
+ doSetProperty("pullBatchSize", pullBatchSize);
+ return this;
+ }
+
+
+ /**
+ * Maximum time (in milliseconds) to wait for a batch of messages to be
+ * available on the server during a single fetch when using a JetStream
+ * Pull Subscription. Only used when {code pullSubscription=true}.
+ *
+ * The option is a: <code>long</code> type.
+ *
+ * Default: 1000
+ * Group: consumer
+ *
+ * @param pullFetchTimeout the value to set
+ * @return the dsl builder
+ */
+ default NatsComponentBuilder pullFetchTimeout(long pullFetchTimeout) {
+ doSetProperty("pullFetchTimeout", pullFetchTimeout);
+ return this;
+ }
+
+
/**
* Sets the consumer subscription type for JetStream. Set to true to
use
* a Pull Subscription (consumer explicitly requests messages). Set to
@@ -841,6 +879,8 @@ public interface NatsComponentBuilderFactory {
case "maxMessages": getOrCreateConfiguration((NatsComponent)
component).setMaxMessages((java.lang.String) value); return true;
case "nackWait": getOrCreateConfiguration((NatsComponent)
component).setNackWait((long) value); return true;
case "poolSize": getOrCreateConfiguration((NatsComponent)
component).setPoolSize((int) value); return true;
+ case "pullBatchSize": getOrCreateConfiguration((NatsComponent)
component).setPullBatchSize((int) value); return true;
+ case "pullFetchTimeout": getOrCreateConfiguration((NatsComponent)
component).setPullFetchTimeout((long) value); return true;
case "pullSubscription": getOrCreateConfiguration((NatsComponent)
component).setPullSubscription((boolean) value); return true;
case "queueName": getOrCreateConfiguration((NatsComponent)
component).setQueueName((java.lang.String) value); return true;
case "replyToDisabled": getOrCreateConfiguration((NatsComponent)
component).setReplyToDisabled((boolean) value); return true;
diff --git
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
index 004f94329b6b..85a1442fb647 100644
---
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
+++
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
@@ -728,6 +728,74 @@ public interface NatsEndpointBuilderFactory {
doSetProperty("poolSize", poolSize);
return this;
}
+ /**
+ * Maximum number of messages to fetch per pull request when using a
+ * JetStream Pull Subscription. Only used when {code
+ * pullSubscription=true}.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 10
+ * Group: consumer
+ *
+ * @param pullBatchSize the value to set
+ * @return the dsl builder
+ */
+ default NatsEndpointConsumerBuilder pullBatchSize(int pullBatchSize) {
+ doSetProperty("pullBatchSize", pullBatchSize);
+ return this;
+ }
+ /**
+ * Maximum number of messages to fetch per pull request when using a
+ * JetStream Pull Subscription. Only used when {code
+ * pullSubscription=true}.
+ *
+ * The option will be converted to a <code>int</code> type.
+ *
+ * Default: 10
+ * Group: consumer
+ *
+ * @param pullBatchSize the value to set
+ * @return the dsl builder
+ */
+ default NatsEndpointConsumerBuilder pullBatchSize(String
pullBatchSize) {
+ doSetProperty("pullBatchSize", pullBatchSize);
+ return this;
+ }
+ /**
+ * Maximum time (in milliseconds) to wait for a batch of messages to be
+ * available on the server during a single fetch when using a JetStream
+ * Pull Subscription. Only used when {code pullSubscription=true}.
+ *
+ * The option is a: <code>long</code> type.
+ *
+ * Default: 1000
+ * Group: consumer
+ *
+ * @param pullFetchTimeout the value to set
+ * @return the dsl builder
+ */
+ default NatsEndpointConsumerBuilder pullFetchTimeout(long
pullFetchTimeout) {
+ doSetProperty("pullFetchTimeout", pullFetchTimeout);
+ return this;
+ }
+ /**
+ * Maximum time (in milliseconds) to wait for a batch of messages to be
+ * available on the server during a single fetch when using a JetStream
+ * Pull Subscription. Only used when {code pullSubscription=true}.
+ *
+ * The option will be converted to a <code>long</code> type.
+ *
+ * Default: 1000
+ * Group: consumer
+ *
+ * @param pullFetchTimeout the value to set
+ * @return the dsl builder
+ */
+ default NatsEndpointConsumerBuilder pullFetchTimeout(String
pullFetchTimeout) {
+ doSetProperty("pullFetchTimeout", pullFetchTimeout);
+ return this;
+ }
/**
* Sets the consumer subscription type for JetStream. Set to true to
use
* a Pull Subscription (consumer explicitly requests messages). Set to