amoghrajesh opened a new pull request, #68283:
URL: https://github.com/apache/airflow/pull/68283

    <!-- SPDX-License-Identifier: Apache-2.0
         https://www.apache.org/licenses/LICENSE-2.0 -->
   
   <!--
   Thank you for contributing!
   
   Please provide above a brief description of the changes made in this pull 
request.
   Write a good git commit message following this guide: 
http://chris.beams.io/posts/git-commit/
   
   Please make sure that your code changes are covered with tests.
   And in case of new features or big changes remember to adjust the 
documentation.
   
   Feel free to ping (in general) for the review if you do not see reaction for 
a few days
   (72 Hours is the minimum reaction time you can expect from volunteers) - we 
sometimes miss notifications.
   
   In case of an existing issue, reference it using one of the following:
   
   * closes: #ISSUE
   * related: #ISSUE
   -->
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   <!--
   If generative AI tooling has been used in the process of authoring this PR, 
please
   change below checkbox to `[X]` followed by the name of the tool, uncomment 
the "Generated-by".
   -->
   
   - [x] Yes: claude sonnet 4.6
   
   <!--
   Generated-by: [Tool Name] following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   -->
   
   Built atop: https://github.com/apache/airflow/pull/68274 (only last commit 
is relevant)
   
   ### What
   
   AIP-103 introduced task and asset state management backed by the metadata 
database. For deployments that need to store large state values as a custom 
backend, this PR adds an object storage alternative via 
`apache-airflow-providers-common-io`, similar to 
https://airflow.apache.org/docs/apache-airflow-providers-common-io/stable/xcom_backend.html
   
   ### Current behaviour
   
   Task and asset state can only be stored in the Airflow metadata database 
(`MetastoreStateBackend`). There is a way to offload values to S3, GCS, Azure 
Blob, or any other object storage, but there is no such example backend to do 
so.
   
   ### Proposed change
   
   Adds `StoreObjectStorageBackend` to `providers/common/io`, which stores task 
and asset state on object storage using `ObjectStoragePath`. The backend 
supports:
   
   - **Threshold-based offloading**: `store_objectstorage_threshold = 0` 
(default) offloads all values to the backend, set a positive byte count to keep 
small values inline in the database and only offload large ones.
   - **Optional compression**: set `store_objectstorage_compression = gzip` (or 
any fsspec-supported codec).
   - **Stable paths across retries**: task state is keyed on `(dag_id, run_id, 
task_id, map_index)`, making this backend suitable for operators using 
`ResumableJobMixin`.
   
   Enable it by setting in `airflow.cfg` (or env vars using `COMMON_IO` as the 
section, e.g. `AIRFLOW__COMMON_IO__STORE_OBJECTSTORAGE_PATH`):
   
   ```ini
   [state_store]
   backend = airflow.providers.common.io.store.backend.StoreObjectStorageBackend
   
   [common.io]
   store_objectstorage_path = s3://conn_id@mybucket/task-state/
   store_objectstorage_threshold = 0
   store_objectstorage_compression = gzip  # optional
   ```
   
   ### Changes of Note
   
   Values persist until explicitly deleted. Use your object storage providers 
lifecycle policies (S3 lifecycle rules, GCS object lifecycle, etc.) to expire 
old state automatically.
   
   The backend requires airflow >= 3.3 (when `BaseStoreBackend` and the 
`AssetScope`/`TaskScope` types were introduced).
   
   ### User implications / backcompat
   
   New opt-in feature, no changes to existing deployments. Requires adding a 
connection in Airflow with the object storage credentials (e.g. `endpoint_url` 
for minio).
   
   ### Testing
   
   Verified end-to-end using the `example_task_store` example Dag with minio 
running locally. Three scenarios were tested:
   
   #### Setup
   
   1. Start MinIO: `docker run -p 29000:9000 -p 29001:9001 -e 
MINIO_ROOT_USER=minioadmin -e MINIO_ROOT_PASSWORD=minioadmin 
quay.io/minio/minio server /data --console-address ":9001"`
   2. Create bucket `airflow-task-state` via minio console 
(`http://localhost:29001`).
   
   <img width="1720" height="893" alt="image" 
src="https://github.com/user-attachments/assets/528a9d0e-166e-4608-aa65-780961373e13";
 />
   
   
   3. Add an Airflow connection `minio` with `conn_type=aws`, 
`login=minioadmin`, `password=minioadmin`, `extra={"endpoint_url": 
"http://host.docker.internal:29000"}` (use `host.docker.internal` when running 
inside Breeze/Docker).
   4. Set the base env vars and restart Airflow so and workers inherit them:
      
   ```shell
   export 
AIRFLOW__WORKERS__STATE_STORE_BACKEND=airflow.providers.common.io.store.backend.StoreObjectStorageBackend
   export 
AIRFLOW__COMMON_IO__STORE_OBJECTSTORAGE_PATH=s3://minio@airflow-task-state/task-state/
   export AIRFLOW__COMMON_IO__STORE_OBJECTSTORAGE_THRESHOLD=0
   
   export AIRFLOW_CONN_MINIO='{"conn_type": "aws", "login": "minioadmin", 
"password": "minioadmin", "extra": {"endpoint_url": 
"http://host.docker.internal:29000"}}'
   ```
   
   **Scenario 1 — All values offloaded (threshold=0)**
   
   ```
   AIRFLOW__COMMON_IO__STORE_OBJECTSTORAGE_THRESHOLD=0
   ```
   
   Verified: files appear in MinIO under `task-state/example_task_store/.../`. 
On try 1, the job ID is written and the task intentionally fails. On try 2, the 
job ID is read back from MinIO and the task reattaches to the existing job.
   
   Try 1:
   <img width="1720" height="893" alt="image" 
src="https://github.com/user-attachments/assets/ab83049a-02a6-497f-801b-5c56b06e6a96";
 />
   
   
   Try 2:
   
   <img width="1720" height="893" alt="image" 
src="https://github.com/user-attachments/assets/ba5b8ce6-90e9-4663-be35-d692372bcbb4";
 />
   
   Task store tab:
   
   <img width="1720" height="893" alt="image" 
src="https://github.com/user-attachments/assets/871d4b37-02ab-4155-884d-596ee0f21234";
 />
   
   Minio:
   <img width="1720" height="893" alt="image" 
src="https://github.com/user-attachments/assets/6db801ef-6bff-40c1-b000-10c99f593966";
 />
   
   
   ---
   
   **Scenario 2 — Threshold: small values stay in DB, large ones go to MinIO**
   
   ```shell
   AIRFLOW__COMMON_IO__STORE_OBJECTSTORAGE_THRESHOLD=50
   ```
   
   Result:
   
   Out of the 4 task store values, only `result` is greater than 50bytes and is 
stored in the minio backend. Rest in database
   
   Try 1:
   <img width="1720" height="893" alt="image" 
src="https://github.com/user-attachments/assets/d8d861ae-2c57-4e14-b4c4-e2e2313d0a83";
 />
   
   Try 2:
   <img width="1720" height="893" alt="image" 
src="https://github.com/user-attachments/assets/98c86fbd-6e10-4125-a317-197fbe4ab47a";
 />
   
   <img width="1720" height="893" alt="image" 
src="https://github.com/user-attachments/assets/902f61c0-f9d9-4d9f-83e7-ff6f8c758c6a";
 />
   
   <img width="1720" height="893" alt="image" 
src="https://github.com/user-attachments/assets/06c7b907-8519-4d00-af79-d67f3a320d8f";
 />
   
   
   ---
   
   
   **Scenario 3 — Compression with gzip**
   
   ```shell
   AIRFLOW__COMMON_IO__STORE_OBJECTSTORAGE_THRESHOLD=0
   AIRFLOW__COMMON_IO__STORE_OBJECTSTORAGE_COMPRESSION=gzip
   ```
   
   Verified: files appear in MinIO with a `.gz` suffix (e.g. 
`task-state/example_task_store/.../job_id.gz`). Decompression is inferred 
automatically on read (`compression="infer"`). Task completes successfully.
   
   Try 1:
   
   <img width="1720" height="893" alt="image" 
src="https://github.com/user-attachments/assets/3e20a39f-27d9-4251-a395-24e651d4677c";
 />
   
   
   Try 2:
   <img width="1720" height="893" alt="image" 
src="https://github.com/user-attachments/assets/bfc763c7-ef53-4e90-9694-c2c761dd0af7";
 />
   
   
   Minio:
   <img width="1720" height="893" alt="image" 
src="https://github.com/user-attachments/assets/a308a96f-0c7e-4fbd-a434-c768c4802048";
 />
   
   
   ```shell
   ~/D/O/r/airflow ❯❯❯ cd ~/Downloads                                           
                                                                                
                     aip
   ~/Downloads ❯❯❯ open result.gz
   ```
   
   
   ---
   
   * Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information. Note: commit author/co-author name and email in commits 
become permanently public when merged.
   * For fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   * When adding dependency, check compliance with the [ASF 3rd Party License 
Policy](https://www.apache.org/legal/resolved.html#category-x).
   * For significant user-facing changes create newsfragment: 
`{pr_number}.significant.rst`, in 
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
 You can add this file in a follow-up commit after the PR is created so you 
know the PR number.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to