oscerd opened a new pull request, #23084:
URL: https://github.com/apache/camel/pull/23084

   ## Summary
   
   Fixes [CAMEL-23419](https://issues.apache.org/jira/browse/CAMEL-23419) — the 
camel-nats JetStream consumer did not actually consume any messages in pull 
subscription mode (which is the default). The only workaround was to explicitly 
opt into push mode via `pullSubscription=false`.
   
   ## Root cause
   
   In `NatsConsumer.setupJetStreamConsumer`, the pull branch used the 
dispatcher-based subscribe overload:
   
   ```java
   this.connection.jetStream().subscribe(topic, dispatcher, messageHandler, 
pullOptions);
   ```
   
   In the NATS Java client this overload creates a managed pull subscription, 
**but the caller still must explicitly request batches via `sub.pull(...)` / 
`sub.fetch(...)`**. Camel never did. As a result the consumer started up 
cleanly and the JetStream consumer appeared on the server, but messages stayed 
in `Unprocessed` and were never delivered to the route.
   
   ## Fix
   
   Switch the pull path to the synchronous API:
   
   ```java
   this.connection.jetStream().subscribe(topic, pullOptions);
   ```
   
   …and drive it from a fetch loop on the existing consumer worker thread. The 
loop respects the consumer's `isRunAllowed()` so shutdown is clean. Each 
fetched message goes through the existing `CamelNatsMessageHandler`, so 
ack/nack/redelivery semantics are unchanged. The push path is unchanged.
   
   Two new consumer options expose the loop tuning:
   
   | Option | Default | Description |
   |--------|---------|-------------|
   | `pullBatchSize` | `10` | Max messages per fetch request |
   | `pullFetchTimeout` | `1000` ms | Max time to wait per fetch request |
   
   Also unsubscribe the JetStream subscription on `doStop()` so pull 
subscriptions are cleaned up properly (push mode previously relied on the 
dispatcher unsubscribe).
   
   ## Tests
   
   - New integration test `NatsJetstreamConsumerPullIT` covers the default pull 
path against a real NATS JetStream broker.
   - Existing JetStream ITs (`NatsJetstreamConsumerIT`, 
`NatsJetstreamConsumerAckPolicyNoneIT`, `NatsJetstreamConsumerMaxDeliverIT`, 
`NatsJetstreamConsumerRedeliveryIT`) all still pass — the push path is 
unaffected.
   
   ```
   [INFO] Running ...NatsJetstreamConsumerAckPolicyNoneIT — Tests run: 1, 
Failures: 0
   [INFO] Running ...NatsJetstreamConsumerIT             — Tests run: 1, 
Failures: 0
   [INFO] Running ...NatsJetstreamConsumerMaxDeliverIT   — Tests run: 1, 
Failures: 0
   [INFO] Running ...NatsJetstreamConsumerPullIT         — Tests run: 1, 
Failures: 0
   [INFO] Running ...NatsJetstreamConsumerRedeliveryIT   — Tests run: 1, 
Failures: 0
   ```
   
   ## Docs
   
   Added a note to `camel-4x-upgrade-guide-4_21.adoc` describing the fix and 
the two new options.
   
   ## Test plan
   
   - [x] `mvn -DskipTests install` for `camel-nats` (with regen of catalog + 
DSL factories)
   - [x] `mvn test` (unit tests)
   - [x] `mvn verify -Dit.test='NatsJetstream*IT'` (all JetStream ITs, 
including the new pull test)
   - [ ] CI green
   
   ---
   _Claude Code on behalf of Andrea Cosentino_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to