shivaam commented on issue #59707:
URL: https://github.com/apache/airflow/issues/59707#issuecomment-4108337693
## Updated Investigation Report
*(Correcting my earlier comment with a more precise analysis — I overstated
certainty on some points.)*
### Regression Source
Commit `16829d7694` — "Add duplicate hostname check for Celery workers
(#58591)" — landed in **celery provider 3.14.0**. This is the only CLI change
between 3.13.1 and 3.14.0, consistent with reports that downgrading to 3.13.1
fixes the issue.
### Root Cause
The duplicate hostname check calls
`celery_app.control.inspect().active_queues()` **before**
`celery_app.worker_main()`. The `inspect()` call opens broker connections and
initializes internal connection/producer pools on the Celery app. When
`worker_main()` starts the worker with the `prefork` pool (which uses
`fork()`), this pre-initialized state interferes with the worker's internal
communication — tasks are received from the broker but never dispatched to pool
workers.
I confirmed this by observing that `app.amqp._producer_pool` is non-None at
`worker_main()` time when the bug occurs, and None when it works correctly.
Clearing this state with `kombu.pools.reset()` after the `inspect()` call
restores correct behavior.
I have not fully traced the exact internal mechanism by which the
pre-initialized pools break the prefork dispatch path, but the correlation is
consistent across all tests.
### Fixes That Work (verified on live Airflow 3.2 + Celery 5.6.2 + Redis)
**Option A — Remove the duplicate hostname check entirely.** Simplest fix,
minor UX loss (no warning if you accidentally start two workers with the same
hostname). Tested and confirmed working.
**Option B — Use a temp app + `kombu.pools.reset()`:** Keeps the check,
cleans up connection state before `worker_main()`:
```python
if args.celery_hostname:
from celery import Celery as _TempCelery
temp_app = _TempCelery(broker=celery_app.conf.broker_url)
try:
active_workers = temp_app.control.inspect().active_queues()
if active_workers:
celery_hostname = args.celery_hostname
if any(
name == celery_hostname or
name.endswith(f"@{celery_hostname}")
for name in active_workers
):
raise SystemExit(
f"Error: A worker with hostname '{celery_hostname}' is
already running."
)
finally:
temp_app.close()
import kombu.pools
kombu.pools.reset()
```
The `kombu.pools.reset()` is essential — `kombu.pools` is a process-global
registry keyed by broker URL, so even a separate temp app's connections end up
in the same global pool.
### Approaches That Did Not Work
| Approach | Result |
|----------|--------|
| Reset `app.amqp._producer_pool = None` only | Insufficient — `kombu.pools`
still holds state |
| Temp app without `kombu.pools.reset()` | Global pools still polluted |
| `app.pool.force_close_all()` | Closes pool permanently, Celery can't
reconnect |
Investigation branch with diagnostic scripts:
[`investigate/celery-hostname-59707`](https://github.com/shivaam/airflow/tree/investigate/celery-hostname-59707)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]