seanmuth opened a new issue, #67123:
URL: https://github.com/apache/airflow/issues/67123
## Apache Airflow Provider(s)
- `apache-airflow-providers-celery` (regression introduced in 3.16.0,
present through current)
## Versions of Apache Airflow Providers
- `apache-airflow-providers-celery==3.16.0` (introducing release) through
**`3.19.0` (current)** and `main`. Verified the per-publish
`create_celery_app(_conf)` call in `send_workload_to_executor` is unchanged
across all releases since 3.16.0.
- One partial optimization has landed since: `get_celery_configuration()` is
now `@cache`-decorated, so the config *dict* is built once. The `Celery()` app
instance itself is still constructed fresh on every publish, so `_backend` is
still `None` at the start of every `apply_async` call and the `entry_points()`
scan still runs per publish. The mechanics of the regression are identical to
3.16.0.
- `apache-airflow-providers-cncf-kubernetes` is referenced as a
counterexample.
## Apache Airflow version
Reproduced on **Airflow 2.11.0** (Astro Runtime 13.4.0). The relevant code
path is present on `main` as well.
## What happened?
Since providers-celery 3.16.0 (PR #60675 — *"AIP-67 - Multi Team: Update
Celery Executor to support multi team"*), `send_task_to_executor` constructs a
fresh `Celery()` app on every task publish:
```python
# providers/celery/.../celery_executor_utils.py
def send_task_to_executor(task_tuple):
key, args, queue, team_name = task_tuple
...
celery_app = create_celery_app(_conf) # NEW: fresh Celery() per publish
task_to_run = celery_app.tasks["execute_workload"]
...
with timeout(seconds=OPERATION_TIMEOUT):
result = task_to_run.apply_async(args=args, queue=queue)
```
The PR's own commit message acknowledges this explicitly:
> Since sending tasks is parallelized with multiple processes (which do not
share memory with the parent) the send task logic now re-creates a celery app
in the sub processes (since the pickling and unpickling that python does to try
pass state to the sub processes was not reliably creating the correct celery
app objects).
Each `Celery()` instance starts with `_backend = None`. The first thing
`apply_async` does is `app.send_task()`, which accesses `self.backend`, which
lazy-resolves via `_get_backend()` → `backends.by_url()` →
`backends.by_name()`, which in turn calls
**`load_extension_class_names("celery.result_backends")`**, which runs
`importlib.metadata.entry_points()` — walking every installed distribution to
read its `entry_points.txt` and merge plugin-registered backends into the alias
map. **The scan is run unconditionally; setting `[celery] result_backend`
explicitly does not skip it.**
Pre-3.16.0, `app = _get_celery_app()` was a module-level singleton. The lazy
backend resolution happened once per subprocess lifetime, then was cached on
the `Celery` instance. Post-3.16.0, the cache is thrown away every publish, so
the scan runs once per task enqueued.
This scan's cost scales linearly with the count of installed distributions
on disk. On realistic production images (~600+ `*.dist-info` entries), each
publish now pays a multi-tens-of-milliseconds tax on the quiet path. Under
scheduler load and memory pressure, the tail extends past the default 1.0s
`operation_timeout`, causing publishes to fail with `AirflowTaskTimeout`,
exhaust the configured retry budget (default 3), and end up logged as:
```
{celery_executor.py:174} INFO - [Try 1 of 3] Task Timeout Error for Task: ...
{celery_executor.py:174} INFO - [Try 2 of 3] Task Timeout Error for Task: ...
{celery_executor.py:174} INFO - [Try 3 of 3] Task Timeout Error for Task: ...
{celery_executor.py:213} ERROR - Error sending Celery task: Timeout, PID: NNN
```
Retries hit the same slow path inside the same subprocess (the new
`Celery()` instance's `_backend` is still `None`), so retrying does not recover.
## Quantitative impact
Measured on a controlled astro-runtime 13.4.0 deployment matching a real
production deployment's scheduler resources (1 vCPU, 2 GiB) and roughly its
`*.dist-info` count (656 distributions). Each iteration mirrors the per-publish
path: `create_celery_app(conf)` + `_ = app.backend`. Wall-clock latency in
milliseconds, n=100 per row:
| Configuration | min | p50 | p95 | p99 | max | iter/s |
|---|---:|---:|---:|---:|---:|---:|
| Idle, no load | 48.1 | 51.1 | 67.0 | 82.0 | 86.4 | 18.8 |
| Idle + 1.5 GiB anon allocation | 48.2 | 52.2 | 88.8 | 94.9 | 100.4 | 17.5 |
| 5 modest DAGs with mapped tasks running | 52.2 | 88.6 | 232.0 | 362.7 |
385.3 | 9.6 |
| Same DAG load + 1.0 GiB anon allocation | **51.0** | **111.8** | **498.3**
| **697.9** | **717.8** | **5.7** |
For comparison: a real production deployment with this regression (~682
distributions, ~640 active DAGs, periodic OOMs) shows `max=558ms` and
consistent `Task Timeout Error` failures at the default 1.0s
`operation_timeout`. The controlled reproduction *exceeds* that worst case
using just 5 DAGs, demonstrating that the regression's impact is multiplicative
across three independent factors:
- Installed distribution count (baseline scan cost)
- Scheduler activity / CPU contention with the publisher subprocess
- Memory pressure causing `entry_points.txt` page-cache eviction
In the field, all three compound simultaneously. The 1.0s
`operation_timeout` default — appropriate for the pre-regression path where
backend resolution was amortized — is no longer safe at typical production
deployment sizes.
## Counterexample: KubernetesExecutor's AIP-67 implementation does not have
this problem
PR #61798 implemented the same AIP-67 multi-team feature for
`KubernetesExecutor`. The pattern is cleaner:
```python
# providers/cncf/kubernetes/.../executors/kubernetes_executor.py
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
...
self.kube_config = KubeConfig(executor_conf=self.conf) # once per
executor
...
```
`KubeConfig` is parameterized on the team-aware `executor_conf` but
constructed *once*, at executor init, and held on the instance. No per-publish
reconstruction.
The KubernetesExecutor publishes via a long-lived multiprocessing worker
reading from a managed queue. The worker process holds its Kubernetes client
across publishes. By contrast, the CeleryExecutor's
`ProcessPoolExecutor.submit(send_task_to_executor, ...)` per-publish design
forced the AIP-67 author to push the team-aware app construction *inside* the
per-call function rather than ahead of it — and the pickling concern cited in
the commit message was specifically about passing app state through the pool
boundary, not about needing fresh apps per task semantically.
In other words, the per-publish reconstruction is **not an inherent
requirement of multi-team support**. It's an artifact of the way the celery
publish pool was already shaped. A subprocess-local cache keyed on `team_name`
would satisfy the multi-team correctness requirement without re-paying the
backend-resolution cost on every publish.
## Proposed fix
Cache the constructed `Celery` app at module level inside the publisher
subprocess, keyed on team:
```python
# providers/celery/.../celery_executor_utils.py
from functools import lru_cache
@lru_cache(maxsize=8) # one app per active team in a given subprocess
def _cached_celery_app_for_team(team_name: str | None) -> Celery:
"""
Subprocess-local Celery app cache keyed on team_name.
Subprocesses don't share memory with the parent, so this cache is
per-subprocess.
Within a subprocess, calls for the same team reuse the same Celery()
instance,
preserving the cached backend resolution that pre-3.16.0 enjoyed
module-globally.
"""
if AIRFLOW_V_3_0_PLUS:
from airflow.executors.base_executor import ExecutorConf
_conf = ExecutorConf(team_name)
else:
_conf = conf
return create_celery_app(_conf)
def send_task_to_executor(task_tuple):
key, args, queue, team_name = task_tuple
celery_app = _cached_celery_app_for_team(team_name) # cached, not
reconstructed
if AIRFLOW_V_3_0_PLUS:
task_to_run = celery_app.tasks["execute_workload"]
...
else:
task_to_run = celery_app.tasks["execute_command"]
...
with timeout(seconds=OPERATION_TIMEOUT):
result = task_to_run.apply_async(args=args, queue=queue)
```
This preserves AIP-67's multi-team correctness (apps are still team-specific
where needed) while restoring per-subprocess amortization for the
`_get_backend()` → `entry_points()` cost. The fix is local to one function,
requires no API changes, and aligns the CeleryExecutor's pattern with how
KubernetesExecutor handled the same AIP-67 requirement.
The `maxsize=8` is a guess at a reasonable per-subprocess team-count ceiling
and should be configurable, defaulting to something modest. For single-team
deployments (the vast majority), `team_name` is `None` and a single cache entry
covers all publishes.
## How to reproduce
A complete reproduction setup follows. (Happy to share the project skeleton
if useful — leaving inline so the steps are self-contained.) Summary:
1. Astro Runtime 13.4.0 (Airflow 2.11.0), 1 vCPU / 2 GiB scheduler.
2. `providers-celery==3.17.2`, `celery==5.6.2`.
3. Inflate the image's distribution count to ~656 via `requirements.txt` (a
wide set of unused providers and utility packages — count is what matters, not
which ones).
4. Five synthetic DAGs with `*/1 * * * *` and `*/2 * * * *` schedules and
mapped tasks producing ~380 task enqueues/minute.
5. Inside the scheduler pod, run a 100-iteration timing script measuring
`create_celery_app(conf); _ = app.backend`. Then layer in a `bytearray(1024 *
1024 * 1024)` allocation in a second shell. Observe `max` cross 700ms; observe
`Task Timeout Error for Task` log entries firing.
Downgrade `providers-celery` to 3.15.x in the same image; the same
measurement collapses to roughly the idle baseline regardless of load +
pressure, because the entry-points scan is amortized over subprocess lifetime.
## Operating system
Reproduction on `linux/amd64` (astro-runtime base, Debian-based). Not
OS-specific; the regression is in Python-level code.
## Deployment
Other 3rd-party Helm chart / managed deployment (Astronomer Hosted
Execution). Same root cause is present in any deployment of Airflow 2.11.0+ or
3.x with `apache-airflow-providers-celery >= 3.16.0`.
## Anything else?
- Workarounds that *do not* help: setting `[celery] result_backend`
explicitly (the entry-points scan runs unconditionally inside `by_name`);
pinning to `celery==5.5.x` (same code path on Python 3.11+); removing the
`importlib_metadata` backport (it's a transitive `apache-airflow` dependency).
- Workarounds that *do* help, in priority: bump `[celery] operation_timeout`
to a generous value (e.g. 2-3s) — this just stops the cliff failure, the
per-publish cost remains; downgrade to `providers-celery==3.15.x` if multi-team
is not required.
- Disabling `AIRFLOW__CORE__MULTI_TEAM` (or running on Airflow 2.x where it
doesn't exist) does **not** mitigate, because the per-publish
`create_celery_app` is unconditional and not gated on whether multi-team is
active.
- `AIRFLOW__CORE__LAZY_LOAD_PLUGINS` has no effect — the scan walks
installed distributions on disk, not loaded modules.
- The OOM symptom (memory pressure feedback loop) often shows up alongside
this regression in deployments with tight scheduler memory limits; addressing
the per-publish cost reduces orphan-adoption-burst load on restart and
indirectly helps the OOM picture.
## Are you willing to submit a PR?
- [x] Yes
## Code of Conduct
- [x] I agree to follow this project's Code of Conduct
---
Drafted-by: Claude Code (Opus 4.7); reviewed by @seanmuth before posting
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]