This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 3b9a5bb4cf43 CAMEL-22705 Reuse existing stream for consumers (#20052)
3b9a5bb4cf43 is described below

commit 3b9a5bb4cf43c630b9ce23f616319ffd434dd74f
Author: Vineet Saurabh <[email protected]>
AuthorDate: Tue Nov 25 15:40:38 2025 +0100

    CAMEL-22705 Reuse existing stream for consumers (#20052)
    
    * CAMEL-22705 Reuse existing stream for consumers
    
    * Formatting update
    
    ---------
    
    Co-authored-by: Vineet Saurabh <[email protected]>
---
 .../apache/camel/component/nats/NatsConsumer.java  | 29 ++++++++++++++++++----
 1 file changed, 24 insertions(+), 5 deletions(-)

diff --git 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index 5febdea9b82b..4035868c103a 100644
--- 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++ 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -165,11 +165,17 @@ public class NatsConsumer extends DefaultConsumer {
                     this.configuration.getTopic());
 
             JetStreamManagement jsm = connection.jetStreamManagement();
-            StreamConfiguration streamConfig = StreamConfiguration.builder()
-                    .name(streamName)
-                    .subjects(topic)
-                    .build();
-            jsm.addStream(streamConfig);
+            if (streamExists(jsm, streamName)) {
+                LOG.debug("JetStream stream '{}' already exists. Skipping 
creation.", streamName);
+            } else {
+                LOG.debug("Creating JetStream {}", streamName);
+                StreamConfiguration streamConfig = 
StreamConfiguration.builder()
+                        .name(streamName)
+                        .subjects(topic)
+                        .build();
+                jsm.addStream(streamConfig);
+                LOG.info("JetStream stream '{}' created successfully.", 
streamName);
+            }
 
             ConsumerConfiguration cc = 
configuration.getConsumerConfiguration();
             if (cc == null) {
@@ -197,6 +203,7 @@ public class NatsConsumer extends DefaultConsumer {
             } else {
                 PushSubscribeOptions pushOptions = 
PushSubscribeOptions.builder()
                         .configuration(cc)
+                        .deliverGroup(queueName)
                         .build();
 
                 NatsConsumer.this.jetStreamSubscription = 
this.connection.jetStream().subscribe(
@@ -283,4 +290,16 @@ public class NatsConsumer extends DefaultConsumer {
         }
     }
 
+    private boolean streamExists(JetStreamManagement jsm, String streamName) 
throws IOException, JetStreamApiException {
+        try {
+            jsm.getStreamInfo(streamName);
+            return true;
+        } catch (JetStreamApiException jsae) {
+            if (jsae.getErrorCode() == 404) {
+                return false;
+            }
+            throw jsae;
+        }
+    }
+
 }

Reply via email to