This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new 839835052a8 Fix flaky test_celery_integration with deterministic task
registration (#66602) (#66916)
839835052a8 is described below
commit 839835052a85429e8771f882ca250ec59e110431
Author: Rahul Vats <[email protected]>
AuthorDate: Fri May 15 10:10:30 2026 +0530
Fix flaky test_celery_integration with deterministic task registration
(#66602) (#66916)
The test patched a fake execute_workload via test_app.task(name=...)(fake)
to short-circuit the real one (which would call the Execution API and fail
with Connection refused, since no API server is running in this setup).
But @app.task defaults to shared=True, which adds a finalizer to celery's
process-global _on_app_finalizers set. start_worker triggers
test_app.finalize(),
which iterates that set in non-deterministic (hash-based) order.
_task_from_fun
keeps the existing entry if the task name is already registered, so
whichever
finalizer fires first wins -- making the test pass or fail at random.
Force the fake to win: finalize the app first (real finalizer registers the
real task), then evict that entry and register the fake explicitly with
shared=False so it stays out of the global finalizer set.
Also drop the dead __wrapped__ = execute lines -- celery dispatches via
Task.run, not __wrapped__, so those weren't redirecting anything.
(cherry picked from commit 9575fb3bf9b9c636abb4619c2012029e3119a08d)
Co-authored-by: Jarek Potiuk <[email protected]>
---
.../integration/celery/test_celery_executor.py | 23 ++++++++++++++--------
1 file changed, 15 insertions(+), 8 deletions(-)
diff --git a/providers/celery/tests/integration/celery/test_celery_executor.py
b/providers/celery/tests/integration/celery/test_celery_executor.py
index da8ee15571b..eca326e810d 100644
--- a/providers/celery/tests/integration/celery/test_celery_executor.py
+++ b/providers/celery/tests/integration/celery/test_celery_executor.py
@@ -85,16 +85,23 @@ def _prepare_app(broker_url=None, execute=None):
test_config = dict(celery_executor_utils.get_celery_configuration())
test_config.update({"broker_url": broker_url})
test_app = Celery(broker_url, config_source=test_config)
- # Register the fake execute function with the test_app using the correct
task name
- # This ensures workers using test_app will execute the fake function
- test_execute = test_app.task(name=execute_name)(execute)
+ # Register the fake execute function on test_app under the same task name
as the real
+ # `execute_workload`. The real task uses `@app.task(...)` (shared=True by
default),
+ # which adds a finalizer to celery's process-global `_on_app_finalizers`
set. When
+ # `start_worker(app=test_app)` calls `test_app.finalize()`, celery
iterates that set in
+ # non-deterministic (hash-based) order and calls `_task_from_fun` for each
— and
+ # `_task_from_fun` keeps the existing entry if the task name is already
registered,
+ # so whichever finalizer fires first wins. If the real one wins, the
worker invokes
+ # the real `execute_workload`, which calls the Execution API at
localhost:8080 and
+ # fails with `Connection refused` since no API server is running in this
test setup.
+ # To make the fake win deterministically: finalize the app first (real
finalizer
+ # registers the real task), then evict that entry and register the fake
explicitly
+ # with `shared=False` so it stays out of the global finalizer set.
+ test_app.finalize()
+ test_app._tasks.pop(execute_name, None)
+ test_execute = test_app.task(name=execute_name, shared=False)(execute)
patch_app = mock.patch.object(celery_executor_utils, "app", test_app)
- if AIRFLOW_V_3_0_PLUS:
- celery_executor_utils.execute_workload.__wrapped__ = execute
- else:
- celery_executor_utils.execute_command.__wrapped__ = execute
-
patch_execute = mock.patch.object(celery_executor_utils, execute_name,
test_execute)
# Patch factory function so CeleryExecutor instances get the test app
patch_factory = mock.patch.object(celery_executor_utils,
"create_celery_app", return_value=test_app)