amoghrajesh opened a new pull request, #67530:
URL: https://github.com/apache/airflow/pull/67530

    <!-- SPDX-License-Identifier: Apache-2.0
         https://www.apache.org/licenses/LICENSE-2.0 -->
   
   <!--
   Thank you for contributing!
   
   Please provide above a brief description of the changes made in this pull 
request.
   Write a good git commit message following this guide: 
http://chris.beams.io/posts/git-commit/
   
   Please make sure that your code changes are covered with tests.
   And in case of new features or big changes remember to adjust the 
documentation.
   
   Feel free to ping (in general) for the review if you do not see reaction for 
a few days
   (72 Hours is the minimum reaction time you can expect from volunteers) - we 
sometimes miss notifications.
   
   In case of an existing issue, reference it using one of the following:
   
   * closes: #ISSUE
   * related: #ISSUE
   -->
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   <!--
   If generative AI tooling has been used in the process of authoring this PR, 
please
   change below checkbox to `[X]` followed by the name of the tool, uncomment 
the "Generated-by".
   -->
   
   - [x] Yes - claude sonnet 4.6
   <!--
   Generated-by: [Tool Name] following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   -->
   
   ### What problem are we solving?
   
   When a custom worker backend (e.g. S3, GCS) stores a state value externally 
and writes a reference string back to the DB, the UI has no way to tell whether 
a value like `s3://bucket/ti_123/job_id` is:
   
   - The user's actual state value (a plain string they stored), or
   - An opaque reference to externally-stored data
   
   Without this distinction, the UI would show the raw path as if it were the 
value, which can be confusing and misleading.
   
   ### Current behaviour
   Custom backends return a reference string from 
`serialize_task_state_to_ref()`, which is stored verbatim in the DB. The DB 
value column contains either a plain JSON value or a reference string with no 
structural difference between the two — the UI cannot differentiate them.
   
   ### Proposed change
   When a custom worker backend is configured, the framework now automatically 
wraps the reference returned by `serialize_task_state_to_ref()` in a typed 
envelope before storing:
   
   `{"__type": "ExternalState", "__var": "s3://bucket/ti_123/job_id"}`
   
   On read, the framework detects the envelope, extracts `__var`, and passes 
the raw ref to `deserialize_task_state_from_ref()` — the backend never sees the 
envelope.
   
   The default path (no custom backend) is unaffected — plain JSON values are 
stored and returned as before.
   
   ### Testing
   
   Created a custom worker side backend based on file system:
   
   ```python
   from __future__ import annotations
   
   import json
   from pathlib import Path
   from typing import TYPE_CHECKING
   
   from airflow.sdk.state import BaseStateBackend
   
   if TYPE_CHECKING:
       from datetime import datetime
   
       from pydantic import JsonValue
       from sqlalchemy.ext.asyncio import AsyncSession
       from sqlalchemy.orm import Session
   
       from airflow_shared.state import StateScope
   
   BASE_DIR = Path("/tmp/airflow_state")
   
   
   class FileStateBackend(BaseStateBackend):
       """Stores task/asset state values as local JSON files; returns the path 
as the ref."""
       def serialize_task_state_to_ref(self, *, value: JsonValue, key: str, 
ti_id: str) -> str:
           path = BASE_DIR / f"ti_{ti_id}" / f"{key}.json"
           path.parent.mkdir(parents=True, exist_ok=True)
           path.write_text(json.dumps(value))
           return str(path)
   
       def deserialize_task_state_from_ref(self, stored: str) -> JsonValue:
           return json.loads(Path(stored).read_text())
   
       def serialize_asset_state_to_ref(self, *, value: JsonValue, key: str, 
asset_ref: str) -> str:
           safe = asset_ref.replace("/", "_").replace(":", "")
           path = BASE_DIR / "assets" / safe / f"{key}.json"
           path.parent.mkdir(parents=True, exist_ok=True)
           path.write_text(json.dumps(value))
           return str(path)
   
       def deserialize_asset_state_from_ref(self, stored: str) -> JsonValue:
           return json.loads(Path(stored).read_text())
   
   
       def get(self, scope: StateScope, key: str, *, session: Session | None = 
None) -> str | None:
           raise NotImplementedError(
               "FileStateBackend is a worker-side backend; server uses 
MetastoreStateBackend"
           )
   
       def set(
           self,
           scope: StateScope,
           key: str,
           value: str,
           *,
           expires_at: datetime | None = None,
           session: Session | None = None,
       ) -> None:
           raise NotImplementedError
   
       def delete(self, scope: StateScope, key: str, *, session: Session | None 
= None) -> None:
           raise NotImplementedError
   
       def clear(
           self, scope: StateScope, *, all_map_indices: bool = False, session: 
Session | None = None
       ) -> None:
           raise NotImplementedError
   
       async def aget(self, scope: StateScope, key: str, *, session: 
AsyncSession | None = None) -> str | None:
           raise NotImplementedError
   
       async def aset(
           self,
           scope: StateScope,
           key: str,
           value: str,
           *,
           expires_at: datetime | None = None,
           session: AsyncSession | None = None,
       ) -> None:
           raise NotImplementedError
   
       async def adelete(self, scope: StateScope, key: str, *, session: 
AsyncSession | None = None) -> None:
           raise NotImplementedError
   
       async def aclear(
           self, scope: StateScope, *, all_map_indices: bool = False, session: 
AsyncSession | None = None
       ) -> None:
           raise NotImplementedError
   
   ```
   
   Ran breeze with: `export 
AIRFLOW__WORKERS__STATE_BACKEND=file_state_backend.FileStateBackend`
   
   DAG:
   ```python
   from airflow.providers.standard.operators.bash import BashOperator
   from airflow.sdk import dag, task, Context
   from datetime import datetime, timedelta
   
   
   @dag(schedule=None, start_date=datetime(2026, 4, 23), catchup=True)
   def simple_task_state():
   
       @task
       def my_task(**context: Context):
           task_state = context["task_state"]
   
           task_state.set("job_id", "12345")
           task_state.set("secret-dict", {"key": "value"})
   
           print("Fetching task states I stored earlier")
   
           print("job_id:", task_state.get("job_id"))
           print("secret-dict:", task_state.get("secret-dict"))
   
       my_task()
   
   
   simple_task_state()
   
   ```
   
   The task is agnostic to the envelope.
   
   <img width="2504" height="987" alt="image" 
src="https://github.com/user-attachments/assets/810d4b94-2a84-4884-84ca-7d9bdeb61f05";
 />
   
   
   File system is updated with the custom backend generated files:
   
   ```shell
   [Breeze:3.10.20] 
root@4bdd2c99b18b:/tmp/airflow_state/ti_019e62fc-9e44-715b-8cb9-cfd6865f8c83$ 
cat
   job_id.json       secret-dict.json
   [Breeze:3.10.20] 
root@4bdd2c99b18b:/tmp/airflow_state/ti_019e62fc-9e44-715b-8cb9-cfd6865f8c83$ 
cat job_id.json
   "12345"[Breeze:3.10.20] 
root@4bdd2c99b18b:/tmp/airflow_state/ti_019e62fc-9e44-715b-8cb9-cfd6865f8c83$ 
cat secret-dict.json
   {"key": "value"}[Breeze:3.10.20] 
root@4bdd2c99b18b:/tmp/airflow_state/ti_019e62fc-9e44-715b-8cb9-cfd6865f8c83$
   ```
   
   
   Core API will return the external reference for UI to build upon:
   
   <img width="2559" height="987" alt="image" 
src="https://github.com/user-attachments/assets/c0fb9c05-5a86-48c1-9d1f-0f6174bb7d89";
 />
   
   
   
   
   ---
   
   * Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information. Note: commit author/co-author name and email in commits 
become permanently public when merged.
   * For fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   * When adding dependency, check compliance with the [ASF 3rd Party License 
Policy](https://www.apache.org/legal/resolved.html#category-x).
   * For significant user-facing changes create newsfragment: 
`{pr_number}.significant.rst`, in 
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
 You can add this file in a follow-up commit after the PR is created so you 
know the PR number.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to