shivaam commented on issue #59707:
URL: https://github.com/apache/airflow/issues/59707#issuecomment-4108337693

   ## Updated Investigation Report
   
   *(Correcting my earlier comment with a more precise analysis — I overstated 
certainty on some points.)*
   
   ### Regression Source
   
   Commit `16829d7694` — "Add duplicate hostname check for Celery workers 
(#58591)" — landed in **celery provider 3.14.0**. This is the only CLI change 
between 3.13.1 and 3.14.0, consistent with reports that downgrading to 3.13.1 
fixes the issue.
   
   ### Root Cause
   
   The duplicate hostname check calls 
`celery_app.control.inspect().active_queues()` **before** 
`celery_app.worker_main()`. The `inspect()` call opens broker connections and 
initializes internal connection/producer pools on the Celery app. When 
`worker_main()` starts the worker with the `prefork` pool (which uses 
`fork()`), this pre-initialized state interferes with the worker's internal 
communication — tasks are received from the broker but never dispatched to pool 
workers.
   
   I confirmed this by observing that `app.amqp._producer_pool` is non-None at 
`worker_main()` time when the bug occurs, and None when it works correctly. 
Clearing this state with `kombu.pools.reset()` after the `inspect()` call 
restores correct behavior.
   
   I have not fully traced the exact internal mechanism by which the 
pre-initialized pools break the prefork dispatch path, but the correlation is 
consistent across all tests.
   
   ### Fixes That Work (verified on live Airflow 3.2 + Celery 5.6.2 + Redis)
   
   **Option A — Remove the duplicate hostname check entirely.** Simplest fix, 
minor UX loss (no warning if you accidentally start two workers with the same 
hostname). Tested and confirmed working.
   
   **Option B — Use a temp app + `kombu.pools.reset()`:** Keeps the check, 
cleans up connection state before `worker_main()`:
   
   ```python
   if args.celery_hostname:
       from celery import Celery as _TempCelery
       temp_app = _TempCelery(broker=celery_app.conf.broker_url)
       try:
           active_workers = temp_app.control.inspect().active_queues()
           if active_workers:
               celery_hostname = args.celery_hostname
               if any(
                   name == celery_hostname or 
name.endswith(f"@{celery_hostname}")
                   for name in active_workers
               ):
                   raise SystemExit(
                       f"Error: A worker with hostname '{celery_hostname}' is 
already running."
                   )
       finally:
           temp_app.close()
           import kombu.pools
           kombu.pools.reset()
   ```
   
   The `kombu.pools.reset()` is essential — `kombu.pools` is a process-global 
registry keyed by broker URL, so even a separate temp app's connections end up 
in the same global pool.
   
   ### Approaches That Did Not Work
   
   | Approach | Result |
   |----------|--------|
   | Reset `app.amqp._producer_pool = None` only | Insufficient — `kombu.pools` 
still holds state |
   | Temp app without `kombu.pools.reset()` | Global pools still polluted |
   | `app.pool.force_close_all()` | Closes pool permanently, Celery can't 
reconnect |
   
   Investigation branch with diagnostic scripts: 
[`investigate/celery-hostname-59707`](https://github.com/shivaam/airflow/tree/investigate/celery-hostname-59707)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to