hemanth-19 opened a new pull request, #25239:
URL: https://github.com/apache/pulsar/pull/25239

   > **Experimental feature** — disabled by default 
(`adaptivePublisherThrottlingEnabled=false`).
   > A broker restart is required to enable it; all tuning parameters are 
dynamic config.
   >
   > **Focused review request** — the areas most likely to have subtle bugs are:
   > 1. **Concurrency** — `AdaptivePublishRateLimiter`: the `volatile boolean 
active` fast path on IO threads (see `handlePublishThrottling`), and the 
single-writer contract on the controller scheduler thread.
   > 2. **Lifecycle** — 
`BrokerService.startAdaptivePublishThrottleController()` and the `close()` 
path; `ThrottleType.AdaptivePublishRate` ordinal stability and the new 
`states[]` slot.
   > 3. **Hysteresis** — deactivation requires **both** memory **and** backlog 
pressure to drop below their low watermarks simultaneously; verify this is 
correct for your use case.
   
   ---
   
   ## Summary
   
   - Adds an adaptive publish throttle controller that dynamically reduces 
producer publish rates when JVM heap usage or per-topic backlog size approaches 
configurable watermarks.
   - **Disabled by default.** Zero code path changes when 
`adaptivePublisherThrottlingEnabled=false`.
   - **Observe-only mode** (`adaptivePublisherThrottlingObserveOnly=true`, 
toggleable at runtime) computes and logs decisions but never applies throttling 
— safe for validating in production and usable as an emergency circuit-breaker.
   - Controller never dies silently: every cycle is wrapped in 
`try-catch-finally`; failures increment a dedicated OTel counter.
   - `ThrottleType.AdaptivePublishRate` is appended as the last enum constant 
to preserve existing ordinals 0–6. A guard test (`ThrottleTypeEnumTest`) fails 
immediately if the enum is accidentally mutated.
   
   ---
   
   ## Diff navigation
   
   ### Configuration (reviewers: verify defaults, dynamic flags, and conf entry 
comments)
   
   | File | What changed |
   |------|-------------|
   | `ServiceConfiguration.java` | 10 new `@FieldContext` fields — 
`adaptivePublisherThrottling*` |
   | `conf/broker.conf` | 10 new commented-out entries with inline docs and a 
recommended quick-start snippet |
   | `README.md` | New "Experimental broker features" section with quick-start 
`broker.conf` snippet |
   
   ### Broker wiring (reviewers: focus on concurrency, lifecycle, and enum 
ordinal safety)
   
   | File | What changed |
   |------|-------------|
   | `ServerCnxThrottleTracker.java` | New `ThrottleType.AdaptivePublishRate` 
constant (appended last, reentrant); class-level Javadoc on ordinal stability |
   | `AbstractTopic.java` | New `AdaptivePublishRateLimiter` field; 
`handlePublishThrottling()` delegation; uses `ThrottleType.AdaptivePublishRate` 
|
   | `BrokerService.java` | `startAdaptivePublishThrottleController()` 
lifecycle; `forEachPersistentTopic()` helper; `close()` teardown |
   | `AdaptivePublishRateLimiter.java` *(new)* | Per-topic limiter: `volatile 
boolean active` fast path, asymmetric EWMA, `activate()`/`deactivate()` |
   | `AdaptivePublishThrottleController.java` *(new)* | Broker-level scheduler: 
pressure math, bounded-step rate changes, hysteresis, `observeOnly` guard, 
`try-catch-finally` safety |
   | `OpenTelemetryAdaptiveThrottleStats.java` *(new)* | 6 broker-level OTel 
metrics (3 always-on + 3 health); 6 per-topic metrics (opt-in) |
   
   ### Tests (reviewers: check the observeOnly safety tests and the enum guard 
tests)
   
   | File | Key assertions |
   |------|---------------|
   | `AdaptivePublishRateLimiterTest.java` *(new)* | No-op guarantee, 
activate/deactivate, asymmetric EWMA, `observeOnly` never changes channel 
autoread |
   | `AdaptivePublishThrottleControllerTest.java` *(new)* | `linearPressure()`, 
bounded-step `computeTargetRate()`, hysteresis, `observeOnly` never calls 
`activate()` |
   | `AdaptiveThrottleEndToEndTest.java` *(new)* | Full control-loop 
integration: pressure → throttle → drain → deactivate; `observeOnly` IO-thread 
safety |
   | `ThrottleTypeEnumTest.java` *(new)* | Minimum constant count, 
`AdaptivePublishRate` present and last, declared reentrant |
   
   ---
   
   ## Design FAQ
   
   **Q: Why introduce a new `ThrottleType.AdaptivePublishRate` instead of 
reusing `TopicPublishRate`?**
   
   `ServerCnxThrottleTracker` uses reference-counting via 
`states[ThrottleType.ordinal()]`.
   If the adaptive limiter shared `TopicPublishRate` with the static rate 
limiter, a single
   `unmarkThrottled()` call from the adaptive side could decrement the count 
that the static
   limiter had incremented — leaving the connection unthrottled even though the 
static limit is
   still exceeded. A dedicated constant keeps the two signals fully independent 
and eliminates an
   entire class of ordering bugs.
   
   **Q: Why does the controller not coordinate across brokers? Each broker 
throttles independently.**
   
   Adaptive throttling reacts to local signals — JVM heap on this JVM, backlog 
on topics owned
   by this broker. Cross-broker coordination would require a consensus 
protocol, introduce network
   latency on the hot publish path, and create a single point of failure. 
Local-only decisions
   are simpler, faster, and fail-safe: if a network partition occurs, each 
broker still protects
   itself independently. Cluster-wide load imbalance (e.g. one hot broker) is 
better addressed
   by Pulsar's topic-migration and load-balancer machinery than by throttle 
coordination.
   
   **Q: What happens if the controller thread crashes?**
   
   The evaluation loop is wrapped in `try-catch-finally`. A caught exception:
   1. Increments `evaluationFailureCount` (surfaced as OTel counter 
`pulsar.broker.adaptive.throttle.controller.evaluation.failure.count`).
   2. Logs the full stack trace at `ERROR` level so it appears in broker logs 
immediately.
   3. Does **not** kill the `ScheduledExecutorService` — the scheduler 
reschedules the next cycle unconditionally.
   
   If the failure is persistent, `last.evaluation.timestamp` stops advancing 
while `evaluation.failure.count` climbs — a clear alert signal. A full 
controller stall (only possible via an unchecked `Error`) requires a broker 
restart; the OTel staleness alert will fire within `3 × intervalMs`.
   
   ---
   
   ## Test plan
   
   - [ ] `mvn test -pl pulsar-broker -am 
-Dtest="AdaptivePublishRateLimiterTest,AdaptivePublishThrottleControllerTest,AdaptiveThrottleEndToEndTest,ThrottleTypeEnumTest"
 -DfailIfNoTests=false --no-transfer-progress` passes
   - [ ] `mvn test -pl pulsar-broker -am -Dtest="PublishRateLimiterTest" 
-DfailIfNoTests=false --no-transfer-progress` (regression: static rate limiter 
unaffected)
   - [ ] Broker starts with `adaptivePublisherThrottlingEnabled=false` 
(default): no controller thread, no `AdaptivePublishRateLimiter` allocated, no 
OTel instruments registered
   - [ ] `observeOnly=true`: logs show `OBSERVE-ONLY would-activate`, zero 
`ACTIVATED` lines, `active.topic.count` metric stays at 0
   - [ ] OTel scrape shows all 6 broker-level metrics when feature is enabled
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to