Croway opened a new pull request, #23480:
URL: https://github.com/apache/camel/pull/23480

   ## Summary
   
   _Claude Code on behalf of Federico Mariani_
   
   When virtual threads are enabled (`camel.threads.virtual.enabled=true`), 
`DefaultThreadPoolFactory.VIRTUAL.newThreadPool()` discards all parameters — 
including `maxQueueSize` — and returns an unbounded 
`Executors.newThreadPerTaskExecutor()`. This destroys the backpressure 
mechanism: polling consumers (SQS, JMS, etc.) pull messages without limit.
   
   This PR adds `BoundedExecutorService`, a semaphore-based `ExecutorService` 
wrapper that enforces a flat concurrency cap on delegated tasks. The 
implementation follows the pattern recommended by [JEP 
444](https://openjdk.org/jeps/444) for limiting concurrency with virtual 
threads.
   
   ### Changes
   
   - **`BoundedExecutorService`** (`camel-util`): wraps any `ExecutorService` 
with a `Semaphore` that limits the maximum number of concurrently delegated 
tasks. Supports three saturation policies via `ThreadPoolRejectedPolicy`:
     - `CallerRuns` (default): blocks up to `keepAliveTime`, then runs on 
caller's thread
     - `Abort`: blocks up to `keepAliveTime`, then throws 
`RejectedExecutionException`
     - `Block` (new): blocks indefinitely until a permit is available
   - **`DefaultThreadPoolFactory.VIRTUAL`**: wraps `newThreadPerTaskExecutor` 
with `BoundedExecutorService` when `maxQueueSize > 0`, using `maxPoolSize + 
maxQueueSize` as the concurrency cap and `keepAliveTime` as the acquisition 
timeout
   - **`ThreadPoolRejectedPolicy`**: adds `Block` policy — blocks the caller 
until capacity is available, for message broker and batch workloads
   - **Documentation**: updates `threading-model.adoc` and 
`virtual-threads.adoc` with rejected policy reference, bounded concurrency 
semantics, and virtual thread specifics
   
   ### Behavioral notes
   
   - Unlike `ThreadPoolExecutor` where pool threads and queued tasks are 
distinct, the semaphore enforces a flat concurrency cap — all permitted tasks 
execute immediately on virtual threads
   - `CallerRuns` tasks execute outside semaphore accounting, so total system 
concurrency may temporarily exceed `maxConcurrent` (same as platform thread 
`CallerRunsPolicy`)
   - `keepAliveTime` is repurposed as the semaphore acquisition timeout (pool 
sizing parameters are not applicable to virtual threads)
   - Exposes operational metrics: `activeCount`, `availablePermits`, 
`waitingCount`, `callerRunsCount`, `rejectedCount`, `delegatedTaskCount`
   
   ## Test plan
   
   - [x] Unit tests: `BoundedExecutorServiceTest` (9 tests — blocking, 
CallerRuns timeout, Abort timeout, Block policy, concurrency bounding, permit 
recycling, shutdown)
   - [x] JMH benchmarks: throughput (no regression vs platform threads), 
concurrency bounding (strict cap enforced), memory (no leaks, stable GC 
allocation), full Camel route end-to-end
   - [ ] CI green


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to