amoghrajesh opened a new pull request, #68283:
URL: https://github.com/apache/airflow/pull/68283
<!-- SPDX-License-Identifier: Apache-2.0
https://www.apache.org/licenses/LICENSE-2.0 -->
<!--
Thank you for contributing!
Please provide above a brief description of the changes made in this pull
request.
Write a good git commit message following this guide:
http://chris.beams.io/posts/git-commit/
Please make sure that your code changes are covered with tests.
And in case of new features or big changes remember to adjust the
documentation.
Feel free to ping (in general) for the review if you do not see reaction for
a few days
(72 Hours is the minimum reaction time you can expect from volunteers) - we
sometimes miss notifications.
In case of an existing issue, reference it using one of the following:
* closes: #ISSUE
* related: #ISSUE
-->
---
##### Was generative AI tooling used to co-author this PR?
<!--
If generative AI tooling has been used in the process of authoring this PR,
please
change below checkbox to `[X]` followed by the name of the tool, uncomment
the "Generated-by".
-->
- [x] Yes: claude sonnet 4.6
<!--
Generated-by: [Tool Name] following [the
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
-->
Built atop: https://github.com/apache/airflow/pull/68274 (only last commit
is relevant)
### What
AIP-103 introduced task and asset state management backed by the metadata
database. For deployments that need to store large state values as a custom
backend, this PR adds an object storage alternative via
`apache-airflow-providers-common-io`, similar to
https://airflow.apache.org/docs/apache-airflow-providers-common-io/stable/xcom_backend.html
### Current behaviour
Task and asset state can only be stored in the Airflow metadata database
(`MetastoreStateBackend`). There is a way to offload values to S3, GCS, Azure
Blob, or any other object storage, but there is no such example backend to do
so.
### Proposed change
Adds `StoreObjectStorageBackend` to `providers/common/io`, which stores task
and asset state on object storage using `ObjectStoragePath`. The backend
supports:
- **Threshold-based offloading**: `store_objectstorage_threshold = 0`
(default) offloads all values to the backend, set a positive byte count to keep
small values inline in the database and only offload large ones.
- **Optional compression**: set `store_objectstorage_compression = gzip` (or
any fsspec-supported codec).
- **Stable paths across retries**: task state is keyed on `(dag_id, run_id,
task_id, map_index)`, making this backend suitable for operators using
`ResumableJobMixin`.
Enable it by setting in `airflow.cfg` (or env vars using `COMMON_IO` as the
section, e.g. `AIRFLOW__COMMON_IO__STORE_OBJECTSTORAGE_PATH`):
```ini
[state_store]
backend = airflow.providers.common.io.store.backend.StoreObjectStorageBackend
[common.io]
store_objectstorage_path = s3://conn_id@mybucket/task-state/
store_objectstorage_threshold = 0
store_objectstorage_compression = gzip # optional
```
### Changes of Note
Values persist until explicitly deleted. Use your object storage providers
lifecycle policies (S3 lifecycle rules, GCS object lifecycle, etc.) to expire
old state automatically.
The backend requires airflow >= 3.3 (when `BaseStoreBackend` and the
`AssetScope`/`TaskScope` types were introduced).
### User implications / backcompat
New opt-in feature, no changes to existing deployments. Requires adding a
connection in Airflow with the object storage credentials (e.g. `endpoint_url`
for minio).
### Testing
Verified end-to-end using the `example_task_store` example Dag with minio
running locally. Three scenarios were tested:
#### Setup
1. Start MinIO: `docker run -p 29000:9000 -p 29001:9001 -e
MINIO_ROOT_USER=minioadmin -e MINIO_ROOT_PASSWORD=minioadmin
quay.io/minio/minio server /data --console-address ":9001"`
2. Create bucket `airflow-task-state` via minio console
(`http://localhost:29001`).
<img width="1720" height="893" alt="image"
src="https://github.com/user-attachments/assets/528a9d0e-166e-4608-aa65-780961373e13"
/>
3. Add an Airflow connection `minio` with `conn_type=aws`,
`login=minioadmin`, `password=minioadmin`, `extra={"endpoint_url":
"http://host.docker.internal:29000"}` (use `host.docker.internal` when running
inside Breeze/Docker).
4. Set the base env vars and restart Airflow so and workers inherit them:
```shell
export
AIRFLOW__WORKERS__STATE_STORE_BACKEND=airflow.providers.common.io.store.backend.StoreObjectStorageBackend
export
AIRFLOW__COMMON_IO__STORE_OBJECTSTORAGE_PATH=s3://minio@airflow-task-state/task-state/
export AIRFLOW__COMMON_IO__STORE_OBJECTSTORAGE_THRESHOLD=0
export AIRFLOW_CONN_MINIO='{"conn_type": "aws", "login": "minioadmin",
"password": "minioadmin", "extra": {"endpoint_url":
"http://host.docker.internal:29000"}}'
```
**Scenario 1 — All values offloaded (threshold=0)**
```
AIRFLOW__COMMON_IO__STORE_OBJECTSTORAGE_THRESHOLD=0
```
Verified: files appear in MinIO under `task-state/example_task_store/.../`.
On try 1, the job ID is written and the task intentionally fails. On try 2, the
job ID is read back from MinIO and the task reattaches to the existing job.
Try 1:
<img width="1720" height="893" alt="image"
src="https://github.com/user-attachments/assets/ab83049a-02a6-497f-801b-5c56b06e6a96"
/>
Try 2:
<img width="1720" height="893" alt="image"
src="https://github.com/user-attachments/assets/ba5b8ce6-90e9-4663-be35-d692372bcbb4"
/>
Task store tab:
<img width="1720" height="893" alt="image"
src="https://github.com/user-attachments/assets/871d4b37-02ab-4155-884d-596ee0f21234"
/>
Minio:
<img width="1720" height="893" alt="image"
src="https://github.com/user-attachments/assets/6db801ef-6bff-40c1-b000-10c99f593966"
/>
---
**Scenario 2 — Threshold: small values stay in DB, large ones go to MinIO**
```shell
AIRFLOW__COMMON_IO__STORE_OBJECTSTORAGE_THRESHOLD=50
```
Result:
Out of the 4 task store values, only `result` is greater than 50bytes and is
stored in the minio backend. Rest in database
Try 1:
<img width="1720" height="893" alt="image"
src="https://github.com/user-attachments/assets/d8d861ae-2c57-4e14-b4c4-e2e2313d0a83"
/>
Try 2:
<img width="1720" height="893" alt="image"
src="https://github.com/user-attachments/assets/98c86fbd-6e10-4125-a317-197fbe4ab47a"
/>
<img width="1720" height="893" alt="image"
src="https://github.com/user-attachments/assets/902f61c0-f9d9-4d9f-83e7-ff6f8c758c6a"
/>
<img width="1720" height="893" alt="image"
src="https://github.com/user-attachments/assets/06c7b907-8519-4d00-af79-d67f3a320d8f"
/>
---
**Scenario 3 — Compression with gzip**
```shell
AIRFLOW__COMMON_IO__STORE_OBJECTSTORAGE_THRESHOLD=0
AIRFLOW__COMMON_IO__STORE_OBJECTSTORAGE_COMPRESSION=gzip
```
Verified: files appear in MinIO with a `.gz` suffix (e.g.
`task-state/example_task_store/.../job_id.gz`). Decompression is inferred
automatically on read (`compression="infer"`). Task completes successfully.
Try 1:
<img width="1720" height="893" alt="image"
src="https://github.com/user-attachments/assets/3e20a39f-27d9-4251-a395-24e651d4677c"
/>
Try 2:
<img width="1720" height="893" alt="image"
src="https://github.com/user-attachments/assets/bfc763c7-ef53-4e90-9694-c2c761dd0af7"
/>
Minio:
<img width="1720" height="893" alt="image"
src="https://github.com/user-attachments/assets/a308a96f-0c7e-4fbd-a434-c768c4802048"
/>
```shell
~/D/O/r/airflow ❯❯❯ cd ~/Downloads
aip
~/Downloads ❯❯❯ open result.gz
```
---
* Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information. Note: commit author/co-author name and email in commits
become permanently public when merged.
* For fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
* When adding dependency, check compliance with the [ASF 3rd Party License
Policy](https://www.apache.org/legal/resolved.html#category-x).
* For significant user-facing changes create newsfragment:
`{pr_number}.significant.rst`, in
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
You can add this file in a follow-up commit after the PR is created so you
know the PR number.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]