anuragrai16 opened a new issue, #18867: URL: https://github.com/apache/pinot/issues/18867
## Summary For realtime tables whose `[ParallelSegmentConsumptionPolicy](https://github.com/apache/pinot/blob/master/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/ParallelSegmentConsumptionPolicy.java)` is `DISALLOW_ALWAYS` (e.g. partial-upsert, non-pauseless), each partition uses a single-permit consumer semaphore (`ConsumerCoordinator`, `Semaphore(1)`) so that only one segment per partition consumes at a time. The permit is acquired when a consumer thread starts and is released **only** through the offload path (`doOffload → closeStreamConsumer → releaseConsumerSemaphore`). Under certain transition orderings, a `RealtimeSegmentDataManager` (RSDM) can be left **running in memory holding the semaphore** while Helix's IdealState shows its segment as `OFFLINE`. No state transition is then able to reach it, so the **next** segment blocks forever on `ConsumerCoordinator.acquire()` and the partition stops ingesting until the stuck segment is manually deleted. We have seen this issue in production a few times now stemming from some memory pressure on server nodes causing ZK resets. ## How it happens - **T — Initial attempt fails during construction.** An `OFFLINE→CONSUMING` for segment `S` throws while building the consuming segment (a transient resource error, e.g. a failed mmap/allocation). The constructor's catch block schedules a background "janitor" thread that sleeps ~30s and then calls `segmentStoppedConsuming` to ask the controller to mark `S` `OFFLINE` in IdealState. - **T+1 — A retry succeeds.** Before the janitor threaed fires, a retried `OFFLINE→CONSUMING` for the **same** segment `S` succeeds. A new RSDM is registered and acquires the semaphore, consumption is healthy. - **T+2 — The stale janitor fires.** The janitor from the **failed** attempt still calls `segmentStoppedConsuming` for `S`. The controller flips IdealState for `S` to `OFFLINE`, even though a healthy consumer is now running. - **T+3 — Cleanup is lost.** Normally the server would receive `CONSUMING→OFFLINE` and offload (releasing the semaphore). If that cleanup transition is not delivered/applied — e.g. the participant's ZooKeeper session changes around this moment, destroying the session-scoped `CurrentState` before the transition is generated — the in-memory RSDM survives (the consumer thread and semaphore are **not** tied to the ZK session) while Helix considers `S` settled at `OFFLINE`. The consumer is now a "ghost": running, holding the semaphore, invisible to Helix. - **Result.** The successor segment `S+1` gets `OFFLINE→CONSUMING`, but its consumer blocks indefinitely on `acquire()`. Ingestion for that partition stalls until `S` is manually deleted (`OFFLINE→DROPPED`), which offloads the in-memory RSDM and releases the semaphore. A related, independent gap: even when an `ERROR→OFFLINE` transition **is** delivered, the server's `onBecomeOfflineFromError()` only logs — it does **not** offload the segment. So a semaphore held by a failed `CONSUMING→ONLINE` path can also leak there. ## Impact - A single partition's realtime ingestion can stall indefinitely (successor segments never acquire the semaphore). - For upsert tables, the orphaned consumer can also retain primary-key ownership, which can cause missing/incorrect query results until the stuck segment is removed. - Recovery currently requires manual intervention (deleting the segment). ## Proposed changes `Note: These fixes are reduces the probability window of the above happening to near zero but doesn't completely fixes it. The compete fix would be to introduce a Reconciliation loop in the Server to catch these ghost segments or override the `reset()` logic to handle the cleanup of ZK session change in a proper manner. But these are high risk changes requiring a design and review. ` ### 1. `ERROR→OFFLINE` cleanup Make `SegmentOnlineOfflineStateModel.onBecomeOfflineFromError()` call `offloadSegment(...)` (wrapped in try/catch, not rethrowing), mirroring `onBecomeOfflineFromConsuming()`. **Why this fixes it:** `offloadSegment()` runs `doOffload → closeStreamConsumer → releaseConsumerSemaphore`, so any RSDM/semaphore left behind by a failed transition is cleaned up when the segment moves `ERROR→OFFLINE`. It is safe to call unconditionally, `offloadSegment()` is null-safe and `releaseConsumerSemaphore()` uses an idempotent CAS. ### 2. Janitor thread guard before the 30s event fires In the init-error janitor path (`postStopConsumedMsg`), before sending `segmentStoppedConsuming`, skip the notification if a segment data manager for the same segment is **already registered and is not `this`** (failed) instance — i.e. a retry has already succeeded. **Why this fixes it:** a failed attempt never registers itself in the segment map, so a registered manager for the same segment name means a newer attempt is live; suppressing the stale "stopped consuming" prevents the wrongful `IdealState→OFFLINE` flip at its source. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
