amoghrajesh opened a new pull request, #66463:
URL: https://github.com/apache/airflow/pull/66463
<!-- SPDX-License-Identifier: Apache-2.0
https://www.apache.org/licenses/LICENSE-2.0 -->
<!--
Thank you for contributing!
Please provide above a brief description of the changes made in this pull
request.
Write a good git commit message following this guide:
http://chris.beams.io/posts/git-commit/
Please make sure that your code changes are covered with tests.
And in case of new features or big changes remember to adjust the
documentation.
Feel free to ping (in general) for the review if you do not see reaction for
a few days
(72 Hours is the minimum reaction time you can expect from volunteers) - we
sometimes miss notifications.
In case of an existing issue, reference it using one of the following:
* closes: #ISSUE
* related: #ISSUE
-->
closes: https://github.com/apache/airflow/issues/66459
### What?
Task state rows right now accumulate indefinitely without a cleanup
mechanism. The AIP specifies that the backend will be responsible for enforcing
retention via a periodic job to avoid ever-growing table size.
Two things needed:
1. time-based garbage collection (delete rows older than N days) and
2. early expiry (per-key override for things like short-lived job IDs).
3. Asset state also needs orphan cleanup as mentioned in:
https://github.com/apache/airflow/pull/66073#discussion_r3175794462 when an
asset is removed from all DAGs its `asset_active` entry is deleted, but
`asset_state` rows stay behind silently.
### Proposed change
- `expires_at` column on `task_state` table — `updated_at` alone can't
distinguish a 7-day key from a 30-day key. A per-row column is the only way to
honour per-key retention overrides. `NULL` means override isn't present and
fall back to the global default; if its set means delete after this timestamp
regardless of `updated_at`.
- `BaseStateBackend.cleanup(*, older_than: datetime)` no-op default — custom
backends can override this for programmatic garbage collection. Airflow calls
it from the scheduler.
- New config options under `[state_store]`: `default_retention_days = 30`
and `state_cleanup_interval = 86400`.
- `MetastoreStateBackend.cleanup()` runs three DELETEs in one pass: rows
past `updated_at` cutoff, rows with `expires_at < now()`, and `asset_state`
rows whose asset has no `asset_active` entry.
- `_cleanup_expired_task_state()` hooked into the scheduler via
`call_regular_interval(state_cleanup_interval, ...)`, consistent with how the
scheduler handles `_remove_unreferenced_triggers` and
`_update_asset_orphanage`. Calls `get_state_backend().cleanup()` — respects the
configured backend, works for both default and custom backends.
### Why choose scheduler and not `airflow db cleanup`?
- `airflow db cleanup` uses a single `--days-ago` flag for all tables, so
operators would have to manually align it with `default_retention_days`.
- It also can't handle `expires_at < now()` since that is an absolute cutoff
threshold, not relative to `--days-ago`.
- A dedicated CLI command was also considered but would require operators to
schedule it separately. The scheduler already runs this kind of periodic
cleanup for triggers and assets, so hooking in there keeps it automatic with no
operator action needed.
**Does scheduler have any performance implication? (cc: @ashb)**
` _cleanup_expired_task_state()` runs every 24 hrs by default — same cadence
as similar maintenance tasks. When it fires it opens its own session and runs
three DELETEs. It can add some overhead but should be manageable since queries
are simple?
The only real concern: if someone sets `state_cleanup_interval` very low and
has a large task_state table, the repeated DELETEs could put load on the DB.
### User implications / backcompat
New config options under `[state_store]` with safe defaults — no action
needed to maintain existing behaviour. The `expires_at` column is nullable;
existing rows get `NULL` (global default retention applies).
### Testing
#### Test setup
1. Started breeze with these configurations:
```shell script
export AIRFLOW__STATE_STORE__STATE_CLEANUP_INTERVAL=300
export AIRFLOW__STATE_STORE__DEFAULT_RETENTION_DAYS=1
```
2. Created a dag run and pushed in 3 task states for it:
<img width="1699" height="890" alt="image (76)"
src="https://github.com/user-attachments/assets/2e328b8f-4288-4571-8c8f-71a28e197110"
/>
#### Testing for `updated_at` / global cleanup
1. Ran this query:
```sql
UPDATE task_state
SET updated_at = '2026-05-04 00:00:00+00:00'
WHERE key = 'job_id_2';
```
2. Waited for the scheduler to run the GC job and rows to be deleted.
```shell script
2026-05-06T08:40:57.899773Z [info ] Adopting or resetting orphaned tasks
for active dag runs [airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:2786
2026-05-06T08:40:57.904527Z [info ] Running task state cleanup
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:3066 older_than=datetime.datetime(2026, 5, 5, 8,
40, 57, 904487, tzinfo=Timezone('UTC')) retention_days=1
```
<img width="1699" height="890" alt="image (77)"
src="https://github.com/user-attachments/assets/5bfa9d7d-b0c0-4cc6-954d-e82c0f62506a"
/>
#### Testing for `expired_at`
1. Ran this query:
```sql
UPDATE task_state
SET expires_at = '2026-05-06 08:45:00+00:00'
WHERE key = 'job_id';
```
<img width="1699" height="890" alt="image (78)"
src="https://github.com/user-attachments/assets/2559abd2-055e-4374-9ce1-5ded1b85d683"
/>
2. Waited for the scheduler to run the GC job and rows to be deleted (note
here that the row with `expired_at` is cleared even though `updated_at` is
still not reaching its cleanup interval)
<img width="1699" height="890" alt="image (79)"
src="https://github.com/user-attachments/assets/85775ccb-5674-4dde-89a6-c474c6070846"
/>
### What's next
- `clear_on_success` hook: https://github.com/apache/airflow/issues/66460
- `task_state.set(retention_days=N)` API to populate `expires_at` at write
time: https://github.com/apache/airflow/issues/66461
---
##### Was generative AI tooling used to co-author this PR?
<!--
If generative AI tooling has been used in the process of authoring this PR,
please
change below checkbox to `[X]` followed by the name of the tool, uncomment
the "Generated-by".
-->
- [ ] Yes (please specify the tool below)
<!--
Generated-by: [Tool Name] following [the
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
-->
---
* Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information. Note: commit author/co-author name and email in commits
become permanently public when merged.
* For fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
* When adding dependency, check compliance with the [ASF 3rd Party License
Policy](https://www.apache.org/legal/resolved.html#category-x).
* For significant user-facing changes create newsfragment:
`{pr_number}.significant.rst`, in
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
You can add this file in a follow-up commit after the PR is created so you
know the PR number.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]