This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 3b9a5bb4cf43 CAMEL-22705 Reuse existing stream for consumers (#20052)
3b9a5bb4cf43 is described below
commit 3b9a5bb4cf43c630b9ce23f616319ffd434dd74f
Author: Vineet Saurabh <[email protected]>
AuthorDate: Tue Nov 25 15:40:38 2025 +0100
CAMEL-22705 Reuse existing stream for consumers (#20052)
* CAMEL-22705 Reuse existing stream for consumers
* Formatting update
---------
Co-authored-by: Vineet Saurabh <[email protected]>
---
.../apache/camel/component/nats/NatsConsumer.java | 29 ++++++++++++++++++----
1 file changed, 24 insertions(+), 5 deletions(-)
diff --git
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index 5febdea9b82b..4035868c103a 100644
---
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -165,11 +165,17 @@ public class NatsConsumer extends DefaultConsumer {
this.configuration.getTopic());
JetStreamManagement jsm = connection.jetStreamManagement();
- StreamConfiguration streamConfig = StreamConfiguration.builder()
- .name(streamName)
- .subjects(topic)
- .build();
- jsm.addStream(streamConfig);
+ if (streamExists(jsm, streamName)) {
+ LOG.debug("JetStream stream '{}' already exists. Skipping
creation.", streamName);
+ } else {
+ LOG.debug("Creating JetStream {}", streamName);
+ StreamConfiguration streamConfig =
StreamConfiguration.builder()
+ .name(streamName)
+ .subjects(topic)
+ .build();
+ jsm.addStream(streamConfig);
+ LOG.info("JetStream stream '{}' created successfully.",
streamName);
+ }
ConsumerConfiguration cc =
configuration.getConsumerConfiguration();
if (cc == null) {
@@ -197,6 +203,7 @@ public class NatsConsumer extends DefaultConsumer {
} else {
PushSubscribeOptions pushOptions =
PushSubscribeOptions.builder()
.configuration(cc)
+ .deliverGroup(queueName)
.build();
NatsConsumer.this.jetStreamSubscription =
this.connection.jetStream().subscribe(
@@ -283,4 +290,16 @@ public class NatsConsumer extends DefaultConsumer {
}
}
+ private boolean streamExists(JetStreamManagement jsm, String streamName)
throws IOException, JetStreamApiException {
+ try {
+ jsm.getStreamInfo(streamName);
+ return true;
+ } catch (JetStreamApiException jsae) {
+ if (jsae.getErrorCode() == 404) {
+ return false;
+ }
+ throw jsae;
+ }
+ }
+
}