This is an automated email from the ASF dual-hosted git repository.

jasonliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2d9b7696b7e feat: add partition_key to DagRunAssetReference (#61725)
2d9b7696b7e is described below

commit 2d9b7696b7efce35a5a142f09975756f795d4f12
Author: Wei Lee <[email protected]>
AuthorDate: Thu Feb 12 23:22:27 2026 +0800

    feat: add partition_key to DagRunAssetReference (#61725)
    
    * feat: add partition_key to DagRunAssetReference
    
    * test: extend test case to include partition_key
    
    * feat: add migration
    
    * fixup! feat: add migration
---
 .../airflow/api_fastapi/core_api/datamodels/assets.py |  3 ++-
 .../core_api/openapi/v2-rest-api-generated.yaml       |  8 +++++++-
 .../execution_api/datamodels/asset_event.py           |  1 +
 .../api_fastapi/execution_api/versions/v2025_11_07.py |  7 ++++++-
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts    | 15 +++++++++++++--
 .../src/airflow/ui/openapi-gen/requests/types.gen.ts  |  3 ++-
 .../api_fastapi/core_api/routes/public/test_assets.py |  4 ++++
 .../core_api/routes/public/test_dag_run.py            | 19 +++++++++++++++----
 .../src/airflowctl/api/datamodels/generated.py        |  3 ++-
 airflow-ctl/tests/airflow_ctl/api/test_operations.py  |  1 +
 task-sdk/src/airflow/sdk/api/datamodels/_generated.py |  1 +
 11 files changed, 54 insertions(+), 11 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
index d6a1106dbc2..c6cb2fa2827 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
@@ -113,7 +113,7 @@ class AssetAliasCollectionResponse(BaseModel):
 
 
 class DagRunAssetReference(StrictBaseModel):
-    """DAGRun serializer for asset responses."""
+    """DagRun serializer for asset responses."""
 
     run_id: str
     dag_id: str
@@ -123,6 +123,7 @@ class DagRunAssetReference(StrictBaseModel):
     state: str
     data_interval_start: datetime | None
     data_interval_end: datetime | None
+    partition_key: str | None
 
 
 class AssetEventResponse(BaseModel):
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 11c8007ada6..40d88a96b82 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -11043,6 +11043,11 @@ components:
             format: date-time
           - type: 'null'
           title: Data Interval End
+        partition_key:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Partition Key
       additionalProperties: false
       type: object
       required:
@@ -11054,8 +11059,9 @@ components:
       - state
       - data_interval_start
       - data_interval_end
+      - partition_key
       title: DagRunAssetReference
-      description: DAGRun serializer for asset responses.
+      description: DagRun serializer for asset responses.
     DagRunState:
       type: string
       enum:
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_event.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_event.py
index 64623c17fb1..f6c3ce82669 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_event.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_event.py
@@ -36,6 +36,7 @@ class DagRunAssetReference(StrictBaseModel):
     state: str
     data_interval_start: datetime | None
     data_interval_end: datetime | None
+    partition_key: str | None
 
 
 class AssetEventResponse(BaseModel):
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_11_07.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_11_07.py
index 9363d825cef..117ba492455 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_11_07.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_11_07.py
@@ -19,7 +19,11 @@ from __future__ import annotations
 
 from cadwyn import ResponseInfo, VersionChange, 
convert_response_to_previous_version_for, schema
 
-from airflow.api_fastapi.execution_api.datamodels.asset_event import 
AssetEventResponse, AssetEventsResponse
+from airflow.api_fastapi.execution_api.datamodels.asset_event import (
+    AssetEventResponse,
+    AssetEventsResponse,
+    DagRunAssetReference,
+)
 from airflow.api_fastapi.execution_api.datamodels.dagrun import 
TriggerDAGRunPayload
 from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun, 
TIRunContext
 
@@ -33,6 +37,7 @@ class AddPartitionKeyField(VersionChange):
         schema(DagRun).field("partition_key").didnt_exist,
         schema(AssetEventResponse).field("partition_key").didnt_exist,
         schema(TriggerDAGRunPayload).field("partition_key").didnt_exist,
+        schema(DagRunAssetReference).field("partition_key").didnt_exist,
     )
 
     @convert_response_to_previous_version_for(TIRunContext)  # type: 
ignore[arg-type]
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index e0b53fb6d6f..a533f31c31f 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -3228,13 +3228,24 @@ export const $DagRunAssetReference = {
                 }
             ],
             title: 'Data Interval End'
+        },
+        partition_key: {
+            anyOf: [
+                {
+                    type: 'string'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Partition Key'
         }
     },
     additionalProperties: false,
     type: 'object',
-    required: ['run_id', 'dag_id', 'logical_date', 'start_date', 'end_date', 
'state', 'data_interval_start', 'data_interval_end'],
+    required: ['run_id', 'dag_id', 'logical_date', 'start_date', 'end_date', 
'state', 'data_interval_start', 'data_interval_end', 'partition_key'],
     title: 'DagRunAssetReference',
-    description: 'DAGRun serializer for asset responses.'
+    description: 'DagRun serializer for asset responses.'
 } as const;
 
 export const $DagRunState = {
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index c409b984d10..0c3ecfe7ce0 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -782,7 +782,7 @@ export type DagProcessorInfoResponse = {
 };
 
 /**
- * DAGRun serializer for asset responses.
+ * DagRun serializer for asset responses.
  */
 export type DagRunAssetReference = {
     run_id: string;
@@ -793,6 +793,7 @@ export type DagRunAssetReference = {
     state: string;
     data_interval_start: string | null;
     data_interval_end: string | null;
+    partition_key: string | null;
 };
 
 /**
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
index 607a7474893..da3fba12a6a 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
@@ -815,6 +815,7 @@ class TestGetAssetEvents(TestAssets):
                             "state": "success",
                             "data_interval_start": 
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
                             "data_interval_end": 
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
+                            "partition_key": None,
                         }
                     ],
                     "timestamp": 
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
@@ -843,6 +844,7 @@ class TestGetAssetEvents(TestAssets):
                             "state": "success",
                             "data_interval_start": 
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
                             "data_interval_end": 
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
+                            "partition_key": None,
                         }
                     ],
                     "timestamp": 
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
@@ -998,6 +1000,7 @@ class TestGetAssetEvents(TestAssets):
                             "state": "success",
                             "data_interval_start": 
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
                             "data_interval_end": 
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
+                            "partition_key": None,
                         }
                     ],
                     "timestamp": 
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
@@ -1026,6 +1029,7 @@ class TestGetAssetEvents(TestAssets):
                             "state": "success",
                             "data_interval_start": 
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
                             "data_interval_end": 
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
+                            "partition_key": None,
                         }
                     ],
                     "timestamp": 
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 577acd5fac5..1664103271f 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -1343,12 +1343,17 @@ class TestDeleteDagRun:
 
 class TestGetDagRunAssetTriggerEvents:
     @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
-    def test_should_respond_200(self, test_client, dag_maker, session):
+    @pytest.mark.parametrize(
+        "partition_key",
+        ["test_partition_key", None],
+        ids=["partitioned", "non-partitioned"],
+    )
+    def test_should_respond_200(self, partition_key, test_client, dag_maker, 
session):
         asset1 = Asset(name="ds1", uri="file:///da1")
 
         with dag_maker(dag_id="source_dag", start_date=START_DATE1, 
session=session):
             EmptyOperator(task_id="task", outlets=[asset1])
-        dr = dag_maker.create_dagrun()
+        dr = dag_maker.create_dagrun(partition_key=partition_key)
         ti = dr.task_instances[0]
 
         asset1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset1.uri))
@@ -1358,12 +1363,17 @@ class TestGetDagRunAssetTriggerEvents:
             source_dag_id=ti.dag_id,
             source_run_id=ti.run_id,
             source_map_index=ti.map_index,
+            partition_key=partition_key,
         )
         session.add(event)
 
         with dag_maker(dag_id="TEST_DAG_ID", start_date=START_DATE1, 
session=session):
             pass
-        dr = dag_maker.create_dagrun(run_id="TEST_DAG_RUN_ID", 
run_type=DagRunType.ASSET_TRIGGERED)
+        dr = dag_maker.create_dagrun(
+            run_id="TEST_DAG_RUN_ID",
+            run_type=DagRunType.ASSET_TRIGGERED,
+            partition_key=partition_key,
+        )
         dr.consumed_asset_events.append(event)
 
         session.commit()
@@ -1398,9 +1408,10 @@ class TestGetDagRunAssetTriggerEvents:
                             "logical_date": 
from_datetime_to_zulu_without_ms(dr.logical_date),
                             "start_date": 
from_datetime_to_zulu_without_ms(dr.start_date),
                             "state": "running",
+                            "partition_key": partition_key,
                         }
                     ],
-                    "partition_key": None,
+                    "partition_key": partition_key,
                 }
             ],
             "total_entries": 1,
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py 
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 921709c0b68..92d77aa4a33 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -339,7 +339,7 @@ class DagProcessorInfoResponse(BaseModel):
 
 class DagRunAssetReference(BaseModel):
     """
-    DAGRun serializer for asset responses.
+    DagRun serializer for asset responses.
     """
 
     model_config = ConfigDict(
@@ -353,6 +353,7 @@ class DagRunAssetReference(BaseModel):
     state: Annotated[str, Field(title="State")]
     data_interval_start: Annotated[datetime | None, Field(title="Data Interval 
Start")] = None
     data_interval_end: Annotated[datetime | None, Field(title="Data Interval 
End")] = None
+    partition_key: Annotated[str | None, Field(title="Partition Key")] = None
 
 
 class DagRunState(str, Enum):
diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py 
b/airflow-ctl/tests/airflow_ctl/api/test_operations.py
index 5e4a8fb9315..ecfa758df32 100644
--- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py
+++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py
@@ -272,6 +272,7 @@ class TestAssetsOperations:
         state="RUNNING",
         data_interval_start=datetime.datetime(2025, 1, 1, 0, 0, 0),
         data_interval_end=datetime.datetime(2025, 1, 1, 0, 0, 0),
+        partition_key=None,
     )
 
     asset_event_response = AssetEventResponse(
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py 
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index 3c2099ffbf5..32824a48b21 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -94,6 +94,7 @@ class DagRunAssetReference(BaseModel):
     state: Annotated[str, Field(title="State")]
     data_interval_start: Annotated[AwareDatetime | None, Field(title="Data 
Interval Start")] = None
     data_interval_end: Annotated[AwareDatetime | None, Field(title="Data 
Interval End")] = None
+    partition_key: Annotated[str | None, Field(title="Partition Key")] = None
 
 
 class DagRunState(str, Enum):

Reply via email to