hemanth-19 opened a new pull request, #25239: URL: https://github.com/apache/pulsar/pull/25239
> **Experimental feature** — disabled by default (`adaptivePublisherThrottlingEnabled=false`). > A broker restart is required to enable it; all tuning parameters are dynamic config. > > **Focused review request** — the areas most likely to have subtle bugs are: > 1. **Concurrency** — `AdaptivePublishRateLimiter`: the `volatile boolean active` fast path on IO threads (see `handlePublishThrottling`), and the single-writer contract on the controller scheduler thread. > 2. **Lifecycle** — `BrokerService.startAdaptivePublishThrottleController()` and the `close()` path; `ThrottleType.AdaptivePublishRate` ordinal stability and the new `states[]` slot. > 3. **Hysteresis** — deactivation requires **both** memory **and** backlog pressure to drop below their low watermarks simultaneously; verify this is correct for your use case. --- ## Summary - Adds an adaptive publish throttle controller that dynamically reduces producer publish rates when JVM heap usage or per-topic backlog size approaches configurable watermarks. - **Disabled by default.** Zero code path changes when `adaptivePublisherThrottlingEnabled=false`. - **Observe-only mode** (`adaptivePublisherThrottlingObserveOnly=true`, toggleable at runtime) computes and logs decisions but never applies throttling — safe for validating in production and usable as an emergency circuit-breaker. - Controller never dies silently: every cycle is wrapped in `try-catch-finally`; failures increment a dedicated OTel counter. - `ThrottleType.AdaptivePublishRate` is appended as the last enum constant to preserve existing ordinals 0–6. A guard test (`ThrottleTypeEnumTest`) fails immediately if the enum is accidentally mutated. --- ## Diff navigation ### Configuration (reviewers: verify defaults, dynamic flags, and conf entry comments) | File | What changed | |------|-------------| | `ServiceConfiguration.java` | 10 new `@FieldContext` fields — `adaptivePublisherThrottling*` | | `conf/broker.conf` | 10 new commented-out entries with inline docs and a recommended quick-start snippet | | `README.md` | New "Experimental broker features" section with quick-start `broker.conf` snippet | ### Broker wiring (reviewers: focus on concurrency, lifecycle, and enum ordinal safety) | File | What changed | |------|-------------| | `ServerCnxThrottleTracker.java` | New `ThrottleType.AdaptivePublishRate` constant (appended last, reentrant); class-level Javadoc on ordinal stability | | `AbstractTopic.java` | New `AdaptivePublishRateLimiter` field; `handlePublishThrottling()` delegation; uses `ThrottleType.AdaptivePublishRate` | | `BrokerService.java` | `startAdaptivePublishThrottleController()` lifecycle; `forEachPersistentTopic()` helper; `close()` teardown | | `AdaptivePublishRateLimiter.java` *(new)* | Per-topic limiter: `volatile boolean active` fast path, asymmetric EWMA, `activate()`/`deactivate()` | | `AdaptivePublishThrottleController.java` *(new)* | Broker-level scheduler: pressure math, bounded-step rate changes, hysteresis, `observeOnly` guard, `try-catch-finally` safety | | `OpenTelemetryAdaptiveThrottleStats.java` *(new)* | 6 broker-level OTel metrics (3 always-on + 3 health); 6 per-topic metrics (opt-in) | ### Tests (reviewers: check the observeOnly safety tests and the enum guard tests) | File | Key assertions | |------|---------------| | `AdaptivePublishRateLimiterTest.java` *(new)* | No-op guarantee, activate/deactivate, asymmetric EWMA, `observeOnly` never changes channel autoread | | `AdaptivePublishThrottleControllerTest.java` *(new)* | `linearPressure()`, bounded-step `computeTargetRate()`, hysteresis, `observeOnly` never calls `activate()` | | `AdaptiveThrottleEndToEndTest.java` *(new)* | Full control-loop integration: pressure → throttle → drain → deactivate; `observeOnly` IO-thread safety | | `ThrottleTypeEnumTest.java` *(new)* | Minimum constant count, `AdaptivePublishRate` present and last, declared reentrant | --- ## Design FAQ **Q: Why introduce a new `ThrottleType.AdaptivePublishRate` instead of reusing `TopicPublishRate`?** `ServerCnxThrottleTracker` uses reference-counting via `states[ThrottleType.ordinal()]`. If the adaptive limiter shared `TopicPublishRate` with the static rate limiter, a single `unmarkThrottled()` call from the adaptive side could decrement the count that the static limiter had incremented — leaving the connection unthrottled even though the static limit is still exceeded. A dedicated constant keeps the two signals fully independent and eliminates an entire class of ordering bugs. **Q: Why does the controller not coordinate across brokers? Each broker throttles independently.** Adaptive throttling reacts to local signals — JVM heap on this JVM, backlog on topics owned by this broker. Cross-broker coordination would require a consensus protocol, introduce network latency on the hot publish path, and create a single point of failure. Local-only decisions are simpler, faster, and fail-safe: if a network partition occurs, each broker still protects itself independently. Cluster-wide load imbalance (e.g. one hot broker) is better addressed by Pulsar's topic-migration and load-balancer machinery than by throttle coordination. **Q: What happens if the controller thread crashes?** The evaluation loop is wrapped in `try-catch-finally`. A caught exception: 1. Increments `evaluationFailureCount` (surfaced as OTel counter `pulsar.broker.adaptive.throttle.controller.evaluation.failure.count`). 2. Logs the full stack trace at `ERROR` level so it appears in broker logs immediately. 3. Does **not** kill the `ScheduledExecutorService` — the scheduler reschedules the next cycle unconditionally. If the failure is persistent, `last.evaluation.timestamp` stops advancing while `evaluation.failure.count` climbs — a clear alert signal. A full controller stall (only possible via an unchecked `Error`) requires a broker restart; the OTel staleness alert will fire within `3 × intervalMs`. --- ## Test plan - [ ] `mvn test -pl pulsar-broker -am -Dtest="AdaptivePublishRateLimiterTest,AdaptivePublishThrottleControllerTest,AdaptiveThrottleEndToEndTest,ThrottleTypeEnumTest" -DfailIfNoTests=false --no-transfer-progress` passes - [ ] `mvn test -pl pulsar-broker -am -Dtest="PublishRateLimiterTest" -DfailIfNoTests=false --no-transfer-progress` (regression: static rate limiter unaffected) - [ ] Broker starts with `adaptivePublisherThrottlingEnabled=false` (default): no controller thread, no `AdaptivePublishRateLimiter` allocated, no OTel instruments registered - [ ] `observeOnly=true`: logs show `OBSERVE-ONLY would-activate`, zero `ACTIVATED` lines, `active.topic.count` metric stays at 0 - [ ] OTel scrape shows all 6 broker-level metrics when feature is enabled 🤖 Generated with [Claude Code](https://claude.com/claude-code) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
