lhotari opened a new issue, #25688: URL: https://github.com/apache/pulsar/issues/25688
### Search before reporting - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Motivation `PersistentTopic` exposes two state flags that gate concurrent access during teardown: - [`AbstractTopic.isFenced`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java#L127) — transient or persistent fence. - [`PersistentTopic.isClosingOrDeleting`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L286) — single flag set by [`fenceTopicToCloseOrDelete()`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L4716-L4719), called from BOTH `delete()` (line 1611) and `close()` (line 1795, which handles unload / transferring). Two problems: **1. State model conflates close/unload with delete.** A topic being **unloaded** (or transferring during ExtensibleLoadManager bundle move) is still going to exist — it will reload on the next access against the new owner. A topic being **deleted** is gone permanently. Today both states share `isClosingOrDeleting=true`, so callers cannot distinguish: - For client-facing admin APIs, the correct response differs: a `404 Not Found` is correct mid-delete, but wrong mid-unload (the client should retry / re-lookup, not give up). - For maintenance/scheduled jobs, skipping is correct in both — they shouldn't operate on a topic in any transitional state — but for distinct reasons. **2. Scheduled jobs and several admin APIs don't consult these flags at all and operate on transitional topics.** This produces races: e.g., the backlog-quota checker mutates managed-ledger cursor state on a topic the delete path is concurrently tearing down. PR #25684 fixes the `BacklogQuotaManager` instance, but the same pattern repeats in many other call sites. #### Scheduled-job entry points that don't filter `isFenced` / `isClosingOrDeleting` All iterate via [`BrokerService.forEachTopic`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java#L2459) / [`forEachPersistentTopic`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java#L2466) and dispatch to `PersistentTopic` methods that touch managed-ledger / cursor / subscription state without an entry-point guard: | Scheduler entry (BrokerService) | PersistentTopic method | What it touches | |---|---|---| | `monitorBacklogQuota` | `BacklogQuotaManager.handleExceededBacklogQuota` | cursors (skipEntries, markDeletePosition) — **fixed in PR #25684** | | `checkGC` | `checkGC` (line 3552) | may invoke `delete()` | | `checkMessageExpiry` | `checkMessageExpiry` (line 2255) | reads managed ledger | | `checkReplicationPolicies` | `checkReplication` (line 2022) | mutates replicators | | `checkCompaction` | `checkCompaction` (line 2352) | reads compaction subscription, can trigger compaction | | `checkConsumedLedgers` | `managedLedger.trimConsumedLedgersInBackground` + `rolloverCursorsInBackground` | direct ML mutations | | `checkMessageDeduplicationInfo` | `checkMessageDeduplicationInfo` (line 2343) | `purgeInactiveProducers` | | `checkInactiveSubscriptions` | `checkInactiveSubscriptions` (line 3714) | iterates subs, may unsubscribe | | `checkClusterMigration` | `checkClusterMigration` (line 3363) | reads/sets migration state | | (deduplication snapshot scheduler, BrokerService.java:734) | `checkDeduplicationSnapshot` (line 3774) | takes ML snapshot | | (called from broker, but per-topic) | `checkBackloggedCursors` (line 3739) | reads ML config + cursor state | The handful of in-method guards that do exist (`if (isClosingOrDeleting)` at lines 1564, 1781, 2459, 4488, 4701) only protect user-driven actions like `delete()` and `triggerCompaction()`, not the periodic check paths. #### Admin APIs that access managed ledger of transitional topics - [`PersistentTopic.getInternalStats(boolean)`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3151) directly calls `ledger.getManagedLedgerInternalStats(...)` with no state check. On a closing/deleting topic the ledger may be transitional → the call either races, returns stale data, or fails with a non-domain exception that the admin layer maps to 500. - Other `PersistentTopic` accessors that read managed-ledger / cursor state (stats, backlog, lookup-time inspectors) likely have the same shape; this issue covers the pattern, with `getInternalStats` as a representative example. The right response shape depends on which transitional state we're in: - **Closing/unloading/transferring**: topic still exists in metadata. Admin should return a transient error (e.g., 503 with retry-after, or transparently wait for re-load) so the caller retries — **404 is wrong here.** - **Deleting**: topic is being permanently removed. Returning 404 is correct. ### Solution Two related changes. **A. Introduce distinct states for close/unload vs delete.** Replace (or augment) the single `isClosingOrDeleting` boolean with a state enum that distinguishes the two terminal/transient cases: ```java enum TopicLifecycleState { ACTIVE, FENCED, // transient, e.g., interceptor exception CLOSING, // unload / transferring, topic will reload elsewhere DELETING // permanent, topic is going away } ``` `fenceTopicToCloseOrDelete()` becomes two distinct entry points (one set by `delete()`, one by `close()`/transfer). Existing call sites that only need *any* terminal state can use a helper like `isInTeardown()`. **B. Apply consistent guards.** - Filter inside `forEachTopic` / `forEachPersistentTopic` so all maintenance jobs uniformly skip non-ACTIVE topics. One change point covers ~10 schedulers. - Update admin-facing accessors (`getInternalStats` and similar ML/cursor-reading methods) to fail with a distinct exception per state — `NotFoundException` for `DELETING`, a transient exception (mapped to 503) for `CLOSING`/`FENCED`. The admin REST layer already maps `NotFoundException` → 404; a sibling mapping for the transient case completes the picture. A short-term mitigation can use the existing `isFenced` / `isClosingOrDeleting` flags: filtering at `forEachTopic` and adding a state check at `getInternalStats` would close most of the gap immediately. The state-enum refactor can follow once the call sites are converged on a common helper. ### Alternatives - **Keep the single flag, add per-call-site guards.** Already partially done in user-driven actions. Easy to forget; doesn't address the close-vs-delete semantic difference for admin responses. - **Add only a separate `isDeleting` boolean (no enum).** Less invasive but adds another flag to keep in sync with `isClosingOrDeleting`. The enum is clearer about which states are mutually exclusive. - **Skip-only at the iterator level (no admin-API change).** Fixes scheduled-job races but leaves the 404-vs-503 distinction unsolved for admin clients. ### Anything else? Related fixes in flight: - #25684 — fixes the specific `BacklogQuotaManager` race (early-return in `handleExceededBacklogQuota` when `isFenced()` or `isClosingOrDeleting()`). One instance of the broader pattern this issue describes. - #25680 — test-side workaround for the symptom in `BacklogQuotaManagerTest` (longer cleanup budget + idempotent `@BeforeMethod`). ### Are you willing to submit a PR? - [x] I'm willing to submit a PR\! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
