shivaam commented on issue #59707: URL: https://github.com/apache/airflow/issues/59707#issuecomment-4108348088
> ## Updated Investigation Report > ### Regression Source > Commit `16829d7694` — "Add duplicate hostname check for Celery workers ([#58591](https://github.com/apache/airflow/pull/58591))" — landed in **celery provider 3.14.0**. This is the only CLI change between 3.13.1 and 3.14.0, consistent with reports that downgrading to 3.13.1 fixes the issue. > > ### Root Cause > The duplicate hostname check calls `celery_app.control.inspect().active_queues()` **before** `celery_app.worker_main()`. The `inspect()` call opens broker connections and initializes internal connection/producer pools on the Celery app. When `worker_main()` starts the worker with the `prefork` pool (which uses `fork()`), this pre-initialized state interferes with the worker's internal communication — tasks are received from the broker but never dispatched to pool workers. > > I confirmed this by observing that `app.amqp._producer_pool` is non-None at `worker_main()` time when the bug occurs, and None when it works correctly. Clearing this state with `kombu.pools.reset()` after the `inspect()` call restores correct behavior. > > I have not fully traced the exact internal mechanism by which the pre-initialized pools break the prefork dispatch path, but the correlation is consistent across all tests. > > ### Fixes That Work (verified on live Airflow 3.2 + Celery 5.6.2 + Redis) > **Option A — Remove the duplicate hostname check entirely.** Simplest fix, minor UX loss (no warning if you accidentally start two workers with the same hostname). Tested and confirmed working. > > **Option B — Use a temp app + `kombu.pools.reset()`:** Keeps the check, cleans up connection state before `worker_main()`: > > if args.celery_hostname: > from celery import Celery as _TempCelery > temp_app = _TempCelery(broker=celery_app.conf.broker_url) > try: > active_workers = temp_app.control.inspect().active_queues() > if active_workers: > celery_hostname = args.celery_hostname > if any( > name == celery_hostname or name.endswith(f"@{celery_hostname}") > for name in active_workers > ): > raise SystemExit( > f"Error: A worker with hostname '{celery_hostname}' is already running." > ) > finally: > temp_app.close() > import kombu.pools > kombu.pools.reset() > The `kombu.pools.reset()` is essential — `kombu.pools` is a process-global registry keyed by broker URL, so even a separate temp app's connections end up in the same global pool. > > ### Approaches That Did Not Work > Approach Result > Reset `app.amqp._producer_pool = None` only Insufficient — `kombu.pools` still holds state > Temp app without `kombu.pools.reset()` Global pools still polluted > `app.pool.force_close_all()` Closes pool permanently, Celery can't reconnect > Investigation branch with diagnostic scripts: [`investigate/celery-hostname-59707`](https://github.com/shivaam/airflow/tree/investigate/celery-hostname-59707) @dheerajturaga Do you have any idea why the duplicate check might be causing this isuse? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
