shivaam commented on issue #59707:
URL: https://github.com/apache/airflow/issues/59707#issuecomment-4108310186

   ## Investigation Report
   
   I spent time investigating this end-to-end on a live Airflow 3.2 instance 
(CeleryExecutor, Redis broker, PostgreSQL backend, Celery 5.6.2). Here's what I 
found.
   
   ### Regression Source
   
   The bug was introduced by commit `16829d7694` — "Add duplicate hostname 
check for Celery workers (#58591)" — which landed in **celery provider 
3.14.0**. This is confirmed by:
   - `git log --oneline providers-celery/3.13.1..providers-celery/3.14.0 -- 
providers/celery/src/airflow/providers/celery/cli/celery_command.py` showing it 
as the **only** CLI change between those versions
   - Multiple comments on this issue confirming that downgrading to 3.13.1 
fixes it
   
   ### Root Cause
   
   The duplicate hostname check (lines 222-236 of `celery_command.py`) calls 
`celery_app.control.inspect().active_queues()` **before** 
`celery_app.worker_main()`. This is the problem.
   
   **The chain:**
   
   1. `inspect().active_queues()` broadcasts a control message to Redis and 
waits for replies
   2. This lazily initializes `celery_app.amqp._producer_pool` and opens TCP 
sockets to the Redis broker
   3. These pools and sockets register in **`kombu.pools`** — a 
**process-global** registry keyed by broker URL
   4. `worker_main()` then calls `fork()` to create prefork pool workers
   5. Children inherit the parent's open socket file descriptors
   6. Parent (consumer) and children (pool workers) now share the same Redis 
sockets
   7. The `-O fair` scheduling strategy requires consumer-to-pool IPC over 
these connections
   8. Shared sockets cause silent communication failure — tasks are received 
but never dispatched
   
   **Key discovery: `kombu.pools` is global, not per-app.** Even creating a 
separate "temp" Celery app for the inspection pollutes the shared global pool 
because `kombu.pools` is keyed by broker URL, not app identity:
   
   ```
   # From our diagnostic script:
   A.pool is B.pool: True                              # Same object!
   A.amqp.producer_pool is B.amqp.producer_pool: True  # Same object!
   ```
   
   ### Live Test Results
   
   **Environment:** Airflow 3.2.0.dev0, Celery 5.6.2, Redis 6.2.20, PostgreSQL 
(RDS), EC2 (Amazon Linux 2023)
   
   | Test | Worker hostname | Result |
   |------|----------------|--------|
   | Baseline (no `--celery-hostname`) | `celery@ip-10-0-2-61...` | Task 
**succeeded** in 8.1s |
   | Bug repro (with `--celery-hostname`) | `myworker@ip-10-0-2-61...` | Task 
**stuck in RESERVED** 60s+ (`acknowledged=False`, `worker_pid=None`) |
   | Fix applied (temp app + `kombu.pools.reset()`) | 
`myworker@ip-10-0-2-61...` | Task **succeeded** in 2.6s |
   
   **Bug repro logs (task stuck):**
   ```
   Task execute_workload[5c97cd55...] received
   << NO FURTHER OUTPUT — task never dispatched to pool worker >>
   
   celery inspect reserved:
     hostname: [email protected]
     time_start: None
     acknowledged: False
     worker_pid: None
   
   DB: test_celery_hostname | say_hello | queued  (never progressed)
   ```
   
   **Fix verified logs (task executes):**
   ```
   [DEBUG] app.amqp._producer_pool is None (GOOD) at worker_main() time.
   Task execute_workload[b41f1956...] received
   [b41f1956...] Executing workload in Celery: task_id='say_hello'
   Task finished  exit_code=0  final_state=success
   Task execute_workload[b41f1956...] succeeded in 2.596s
   
   DB: say_hello | success
   ```
   
   ### The Fix
   
   Use a temporary Celery app for the `inspect()` call, then **reset 
`kombu.pools`** in a `finally` block to clear any global state pollution:
   
   ```python
   if args.celery_hostname:
       from celery import Celery as _TempCelery
       temp_app = _TempCelery(broker=celery_app.conf.broker_url)
       try:
           active_workers = temp_app.control.inspect().active_queues()
           if active_workers:
               celery_hostname = args.celery_hostname
               if any(
                   name == celery_hostname or 
name.endswith(f"@{celery_hostname}")
                   for name in active_workers
               ):
                   raise SystemExit(
                       f"Error: A worker with hostname '{celery_hostname}' is 
already running."
                   )
       finally:
           temp_app.close()
           import kombu.pools
           kombu.pools.reset()  # CRITICAL: clear global pool state before fork
   ```
   
   The `kombu.pools.reset()` is essential — without it, even the temp app's 
connections persist in the global registry and get inherited after fork.
   
   **Secondary fix:** The duplicate hostname detection also has a bug where 
hostnames containing `@` (e.g., `myworker@mymachine`) fail the 
`endswith("@myworker@mymachine")` check. Added exact-match fallback: `name == 
celery_hostname or name.endswith(...)`.
   
   ### Approaches We Tried and Discarded
   
   | Approach | Why it didn't work |
   |----------|-------------------|
   | Convert `args.concurrency` from `int` to `str` | Correct style fix but 
unrelated to root cause — Celery handles int coercion |
   | Reset `app.amqp._producer_pool = None` after inspect | Insufficient — 
`kombu.pools` still holds open sockets in the global registry |
   | Temp app alone (no `kombu.pools.reset()`) | `kombu.pools` is keyed by 
broker URL, not app — temp app pollutes the same global entry |
   | `app.pool.force_close_all()` | Closes pool permanently — Celery can't 
create new connections later |
   | Remove `-O fair` flag | Workaround, not a fix — `-O fair` prevents task 
starvation in production |
   | Set hostname via `celery_app.conf` instead of CLI | Doesn't address the 
connection pool pollution |
   
   ### How to Reproduce
   
   ```bash
   # 1. Configure Airflow with CeleryExecutor + Redis broker
   export AIRFLOW__CORE__EXECUTOR=CeleryExecutor
   export AIRFLOW__CELERY__BROKER_URL=redis://localhost:6379/0
   
   # 2. Start scheduler + api-server
   airflow api-server &
   airflow scheduler &
   
   # 3. Start worker WITH --celery-hostname
   airflow celery worker --queues default --concurrency 1 --celery-hostname 
"myworker@%h"
   
   # 4. Trigger any DAG
   airflow dags trigger <any_dag>
   
   # 5. After 60s, check:
   celery -A airflow.providers.celery.executors.celery_executor_utils.app 
inspect reserved
   # → Task stuck with acknowledged=False, worker_pid=None
   ```
   
   Full investigation docs and diagnostic scripts are on branch 
[`investigate/celery-hostname-59707`](https://github.com/shivaam/airflow/tree/investigate/celery-hostname-59707)
 in `.claude/celery-hostname-issue-59707/` and `dev/`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to