oscerd opened a new pull request, #23087:
URL: https://github.com/apache/camel/pull/23087

   ## Backport of #23084
   
   Cherry-pick of #23084 onto `camel-4.18.x`.
   
   **Original PR:** #23084 - CAMEL-23419: camel-nats - Fix JetStream consumer 
pull subscription mode
   **Original author:** @oscerd
   **Target branch:** `camel-4.18.x`
   **Conflicts resolved:** yes — the upgrade-guide note was redirected from 
`camel-4x-upgrade-guide-4_21.adoc` (does not exist on 4.18.x) to 
`camel-4x-upgrade-guide-4_18.adoc`. Per project policy, the matching note on 
`main`'s `camel-4x-upgrade-guide-4_18.adoc` will be added in a follow-up 
doc-sync PR.
   
   ### Original description
   
   Fixes [CAMEL-23419](https://issues.apache.org/jira/browse/CAMEL-23419) — the 
camel-nats JetStream consumer did not actually consume any messages in pull 
subscription mode (which is the default). The only workaround was to explicitly 
opt into push mode via `pullSubscription=false`.
   
   #### Root cause
   
   In `NatsConsumer.setupJetStreamConsumer`, the pull branch used the 
dispatcher-based subscribe overload:
   
   ```java
   this.connection.jetStream().subscribe(topic, dispatcher, messageHandler, 
pullOptions);
   ```
   
   In the NATS Java client this overload creates a managed pull subscription, 
**but the caller still must explicitly request batches via `sub.pull(...)` / 
`sub.fetch(...)`**. Camel never did. As a result the consumer started up 
cleanly and the JetStream consumer appeared on the server, but messages stayed 
in `Unprocessed` and were never delivered to the route.
   
   #### Fix
   
   Switch the pull path to the synchronous API and drive it from a fetch loop 
on the existing consumer worker thread. The push path is unchanged. Two new 
consumer options expose the loop tuning:
   
   | Option | Default | Description |
   |--------|---------|-------------|
   | `pullBatchSize` | `10` | Max messages per fetch request |
   | `pullFetchTimeout` | `1000` ms | Max time to wait per fetch request |
   
   #### Tests
   
   Verified locally on `camel-4.18.x` (jnats 2.25.1):
   
   ```
   [INFO] Running ...NatsJetstreamConsumerAckPolicyNoneIT — Tests run: 1, 
Failures: 0
   [INFO] Running ...NatsJetstreamConsumerIT             — Tests run: 1, 
Failures: 0
   [INFO] Running ...NatsJetstreamConsumerMaxDeliverIT   — Tests run: 1, 
Failures: 0
   [INFO] Running ...NatsJetstreamConsumerPullIT         — Tests run: 1, 
Failures: 0
   [INFO] Running ...NatsJetstreamConsumerRedeliveryIT   — Tests run: 1, 
Failures: 0
   ```
   
   ---
   _Claude Code on behalf of Andrea Cosentino_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to