potiuk opened a new pull request, #66602:
URL: https://github.com/apache/airflow/pull/66602

   ## Summary
   
   
`providers/celery/tests/integration/celery/test_celery_executor.py::TestCeleryExecutor::test_celery_integration`
 has been failing intermittently with `httpx.ConnectError: [Errno 111] 
Connection refused`. The test is supposed to short-circuit task execution with 
a fake `execute_workload`, but the worker was sometimes invoking the real one, 
which calls the Execution API and fails when no API server is running.
   
   ## Root cause
   
   `_prepare_app` registers the fake task on a fresh `test_app` via:
   
   ```python
   test_execute = test_app.task(name=execute_name)(execute)
   ```
   
   `@app.task(...)` defaults to `shared=True`, which appends a `cons` callback 
to celery's **process-global** `celery._state._on_app_finalizers` set — for 
both the *real* `execute_workload` (decorated at module import) and the *fake* 
registered here.
   
   When `start_worker(app=test_app)` calls `test_app.finalize()`, celery 
iterates that set and runs each `cons(test_app)`:
   
   ```python
   def _announce_app_finalized(app):
       callbacks = set(_on_app_finalizers)
       for callback in callbacks:
           callback(app)
   ```
   
   Set iteration order is **hash-based and non-deterministic**. Each `cons` 
calls `test_app._task_from_fun(fn, name=\"execute_workload\")`, which 
short-circuits if the name is already in `_tasks`:
   
   ```python
   if name not in self._tasks:
       ... create ...
   else:
       task = self._tasks[name]   # keeps existing, doesn't replace
   ```
   
   So whichever finalizer fires first wins. If the real one wins, the worker 
invokes the real `execute_workload` → `BaseExecutor.run_workload` → 
`supervise_task` → tries to connect to the Execution API at `localhost:8080` → 
`Connection refused`. The test asserts the `success` task ended in 
`State.SUCCESS`, gets `State.FAILED`. `@pytest.mark.flaky(reruns=5)` doesn't 
help because the cons-registration outcome is consistent within a process 
(function ids hash the same).
   
   The trace from the failed runs shows the log line from 
`celery_executor_utils.py:221` (inside the *real* `execute_workload`), 
confirming the fake never won.
   
   ## Fix
   
   Force the fake to win deterministically:
   
   1. Finalize `test_app` explicitly. The real finalizer registers the real 
task in `test_app._tasks`.
   2. `pop` that entry.
   3. Register the fake via `test_app.task(name=..., shared=False)(execute)`. 
With `shared=False` the fake doesn't add itself to `_on_app_finalizers`, so it 
stays scoped to `test_app`. With the app already finalized, `lazy=True` 
short-circuits to eager registration, and the resulting `Task` object is what 
`mock.patch.object(celery_executor_utils, execute_name, test_execute)` patches 
in.
   
   Also drop the `celery_executor_utils.execute_workload.__wrapped__ = execute` 
lines — celery dispatches tasks via `Task.run`, not `__wrapped__`, so those 
reassignments weren't redirecting anything.
   
   ## Tests
   
   The change touches only the test setup. The existing 
`test_celery_integration` parametrizations are what verify the fix — they now 
reliably hit the fake `execute_workload` regardless of finalizer iteration 
order.
   
   Reproduces the kind of failure seen in 
https://github.com/apache/airflow/actions/runs/25548565094/job/74993414446 (and 
similar recent flaky runs of this test).
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   - [X] Yes — Claude Code (Opus 4.7)
   
   Generated-by: Claude Code (Opus 4.7) following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to