shivaam commented on issue #59707:
URL: https://github.com/apache/airflow/issues/59707#issuecomment-4108348088

   > ## Updated Investigation Report
   > ### Regression Source
   > Commit `16829d7694` — "Add duplicate hostname check for Celery workers 
([#58591](https://github.com/apache/airflow/pull/58591))" — landed in **celery 
provider 3.14.0**. This is the only CLI change between 3.13.1 and 3.14.0, 
consistent with reports that downgrading to 3.13.1 fixes the issue.
   > 
   > ### Root Cause
   > The duplicate hostname check calls 
`celery_app.control.inspect().active_queues()` **before** 
`celery_app.worker_main()`. The `inspect()` call opens broker connections and 
initializes internal connection/producer pools on the Celery app. When 
`worker_main()` starts the worker with the `prefork` pool (which uses 
`fork()`), this pre-initialized state interferes with the worker's internal 
communication — tasks are received from the broker but never dispatched to pool 
workers.
   > 
   > I confirmed this by observing that `app.amqp._producer_pool` is non-None 
at `worker_main()` time when the bug occurs, and None when it works correctly. 
Clearing this state with `kombu.pools.reset()` after the `inspect()` call 
restores correct behavior.
   > 
   > I have not fully traced the exact internal mechanism by which the 
pre-initialized pools break the prefork dispatch path, but the correlation is 
consistent across all tests.
   > 
   > ### Fixes That Work (verified on live Airflow 3.2 + Celery 5.6.2 + Redis)
   > **Option A — Remove the duplicate hostname check entirely.** Simplest fix, 
minor UX loss (no warning if you accidentally start two workers with the same 
hostname). Tested and confirmed working.
   > 
   > **Option B — Use a temp app + `kombu.pools.reset()`:** Keeps the check, 
cleans up connection state before `worker_main()`:
   > 
   > if args.celery_hostname:
   >     from celery import Celery as _TempCelery
   >     temp_app = _TempCelery(broker=celery_app.conf.broker_url)
   >     try:
   >         active_workers = temp_app.control.inspect().active_queues()
   >         if active_workers:
   >             celery_hostname = args.celery_hostname
   >             if any(
   >                 name == celery_hostname or 
name.endswith(f"@{celery_hostname}")
   >                 for name in active_workers
   >             ):
   >                 raise SystemExit(
   >                     f"Error: A worker with hostname '{celery_hostname}' is 
already running."
   >                 )
   >     finally:
   >         temp_app.close()
   >         import kombu.pools
   >         kombu.pools.reset()
   > The `kombu.pools.reset()` is essential — `kombu.pools` is a process-global 
registry keyed by broker URL, so even a separate temp app's connections end up 
in the same global pool.
   > 
   > ### Approaches That Did Not Work
   > Approach   Result
   > Reset `app.amqp._producer_pool = None` only        Insufficient — 
`kombu.pools` still holds state
   > Temp app without `kombu.pools.reset()`     Global pools still polluted
   > `app.pool.force_close_all()`       Closes pool permanently, Celery can't 
reconnect
   > Investigation branch with diagnostic scripts: 
[`investigate/celery-hostname-59707`](https://github.com/shivaam/airflow/tree/investigate/celery-hostname-59707)
   
   @dheerajturaga Do you have any idea why the duplicate check might be causing 
this isuse?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to