This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a21be58ed19 AIP-84: alias `dag_display_name` for `Job` and `Xcom`
(#50065)
a21be58ed19 is described below
commit a21be58ed195ecd05b7149fe116d02586f1ec3f7
Author: Guan Ming(Wesley) Chiu <[email protected]>
AuthorDate: Tue May 6 01:02:10 2025 +0800
AIP-84: alias `dag_display_name` for `Job` and `Xcom` (#50065)
* AIP-84: alias dag_display_name for `Job` and `Xcom`
* Remove unused relationship
Co-authored-by: pierrejeambrun <[email protected]>
* Fix `AliasPath`
---------
Co-authored-by: pierrejeambrun <[email protected]>
---
.../airflow/api_fastapi/core_api/datamodels/job.py | 5 ++
.../api_fastapi/core_api/datamodels/xcom.py | 3 +-
.../core_api/openapi/v1-rest-api-generated.yaml | 17 +++++++
.../api_fastapi/core_api/routes/public/job.py | 8 ++-
.../api_fastapi/core_api/routes/public/xcom.py | 8 ++-
airflow-core/src/airflow/jobs/job.py | 7 +++
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 58 ++++++++++++++++++++--
.../airflow/ui/openapi-gen/requests/types.gen.ts | 4 ++
.../api_fastapi/core_api/routes/public/test_job.py | 1 +
.../core_api/routes/public/test_task_instances.py | 1 +
.../core_api/routes/public/test_xcom.py | 13 +++++
.../src/airflowctl/api/datamodels/generated.py | 4 ++
12 files changed, 123 insertions(+), 6 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/job.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/job.py
index d797464ee90..c06ac730c2b 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/job.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/job.py
@@ -18,6 +18,8 @@ from __future__ import annotations
from datetime import datetime
+from pydantic import AliasPath, Field
+
from airflow.api_fastapi.core_api.base import BaseModel
@@ -34,6 +36,9 @@ class JobResponse(BaseModel):
executor_class: str | None
hostname: str | None
unixname: str | None
+ dag_display_name: str | None = Field(
+ validation_alias=AliasPath("dag_model", "dag_display_name"),
default=None
+ )
class JobCollectionResponse(BaseModel):
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/xcom.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/xcom.py
index 189fad8290c..ec65436955b 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/xcom.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/xcom.py
@@ -19,7 +19,7 @@ from __future__ import annotations
from datetime import datetime
from typing import Any
-from pydantic import field_validator
+from pydantic import AliasPath, Field, field_validator
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
@@ -34,6 +34,7 @@ class XComResponse(BaseModel):
task_id: str
dag_id: str
run_id: str
+ dag_display_name: str = Field(validation_alias=AliasPath("dag_run",
"dag_model", "dag_display_name"))
class XComResponseNative(XComResponse):
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml
index f0c7dd5cdbd..963258a0548 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml
@@ -9201,6 +9201,11 @@ components:
- type: string
- type: 'null'
title: Unixname
+ dag_display_name:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Dag Display Name
type: object
required:
- id
@@ -10624,6 +10629,9 @@ components:
run_id:
type: string
title: Run Id
+ dag_display_name:
+ type: string
+ title: Dag Display Name
type: object
required:
- key
@@ -10633,6 +10641,7 @@ components:
- task_id
- dag_id
- run_id
+ - dag_display_name
title: XComResponse
description: Serializer for a xcom item.
XComResponseNative:
@@ -10662,6 +10671,9 @@ components:
run_id:
type: string
title: Run Id
+ dag_display_name:
+ type: string
+ title: Dag Display Name
value:
title: Value
type: object
@@ -10673,6 +10685,7 @@ components:
- task_id
- dag_id
- run_id
+ - dag_display_name
- value
title: XComResponseNative
description: XCom response serializer with native return type.
@@ -10703,6 +10716,9 @@ components:
run_id:
type: string
title: Run Id
+ dag_display_name:
+ type: string
+ title: Dag Display Name
value:
anyOf:
- type: string
@@ -10717,6 +10733,7 @@ components:
- task_id
- dag_id
- run_id
+ - dag_display_name
- value
title: XComResponseString
description: XCom response serializer with string return type.
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/job.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/job.py
index 997a9152909..b1a35913207 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/job.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/job.py
@@ -20,6 +20,7 @@ from typing import Annotated
from fastapi import Depends, status
from sqlalchemy import select
+from sqlalchemy.orm import joinedload
from airflow.api_fastapi.common.db.common import (
SessionDep,
@@ -101,7 +102,12 @@ def get_jobs(
is_alive: bool | None = None,
) -> JobCollectionResponse:
"""Get all jobs."""
- base_select = select(Job).where(Job.state ==
JobState.RUNNING).order_by(Job.latest_heartbeat.desc())
+ base_select = (
+ select(Job)
+ .where(Job.state == JobState.RUNNING)
+ .order_by(Job.latest_heartbeat.desc())
+ .options(joinedload(Job.dag_model))
+ )
jobs_select, total_entries = paginated_select(
statement=base_select,
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
index 4b1361d4027..2dc746ddfe2 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
@@ -21,6 +21,7 @@ from typing import Annotated
from fastapi import Depends, HTTPException, Query, Request, status
from sqlalchemy import and_, select
+from sqlalchemy.orm import joinedload
from airflow.api_fastapi.auth.managers.models.resource_details import
DagAccessEntity
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
@@ -84,6 +85,7 @@ def get_xcom_entry(
)
query = query.join(DR, and_(XComModel.dag_id == DR.dag_id,
XComModel.run_id == DR.run_id))
query = query.where(DR.run_id == dag_run_id)
+ query =
query.options(joinedload(XComModel.dag_run).joinedload(DR.dag_model))
if deserialize:
item = session.execute(query).one_or_none()
@@ -136,7 +138,9 @@ def get_xcom_entries(
query = select(XComModel)
if dag_id != "~":
query = query.where(XComModel.dag_id == dag_id)
- query = query.join(DR, and_(XComModel.dag_id == DR.dag_id,
XComModel.run_id == DR.run_id))
+ query = query.join(DR, and_(XComModel.dag_id == DR.dag_id,
XComModel.run_id == DR.run_id)).options(
+ joinedload(XComModel.dag_run).joinedload(DR.dag_model)
+ )
if task_id != "~":
query = query.where(XComModel.task_id == task_id)
@@ -249,6 +253,7 @@ def create_xcom_entry(
XComModel.map_index == request_body.map_index,
)
.limit(1)
+ .options(joinedload(XComModel.dag_run).joinedload(DR.dag_model))
)
return XComResponseNative.model_validate(xcom)
@@ -289,6 +294,7 @@ def update_xcom_entry(
XComModel.map_index == patch_body.map_index,
)
.limit(1)
+ .options(joinedload(XComModel.dag_run).joinedload(DR.dag_model))
)
if not xcom_entry:
diff --git a/airflow-core/src/airflow/jobs/job.py
b/airflow-core/src/airflow/jobs/job.py
index 8697c4d2be2..9c06552abca 100644
--- a/airflow-core/src/airflow/jobs/job.py
+++ b/airflow-core/src/airflow/jobs/job.py
@@ -109,6 +109,13 @@ class Job(Base, LoggingMixin):
backref="creating_job",
)
+ dag_model = relationship(
+ "DagModel",
+ primaryjoin="Job.dag_id == DagModel.dag_id",
+ viewonly=True,
+ foreign_keys=[dag_id],
+ )
+
"""
TaskInstances which have been enqueued by this Job.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 4b60ff13a46..a449fdb9cfa 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -3425,6 +3425,17 @@ export const $JobResponse = {
],
title: "Unixname",
},
+ dag_display_name: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Dag Display Name",
+ },
},
type: "object",
required: [
@@ -5664,9 +5675,22 @@ export const $XComResponse = {
type: "string",
title: "Run Id",
},
+ dag_display_name: {
+ type: "string",
+ title: "Dag Display Name",
+ },
},
type: "object",
- required: ["key", "timestamp", "logical_date", "map_index", "task_id",
"dag_id", "run_id"],
+ required: [
+ "key",
+ "timestamp",
+ "logical_date",
+ "map_index",
+ "task_id",
+ "dag_id",
+ "run_id",
+ "dag_display_name",
+ ],
title: "XComResponse",
description: "Serializer for a xcom item.",
} as const;
@@ -5710,12 +5734,26 @@ export const $XComResponseNative = {
type: "string",
title: "Run Id",
},
+ dag_display_name: {
+ type: "string",
+ title: "Dag Display Name",
+ },
value: {
title: "Value",
},
},
type: "object",
- required: ["key", "timestamp", "logical_date", "map_index", "task_id",
"dag_id", "run_id", "value"],
+ required: [
+ "key",
+ "timestamp",
+ "logical_date",
+ "map_index",
+ "task_id",
+ "dag_id",
+ "run_id",
+ "dag_display_name",
+ "value",
+ ],
title: "XComResponseNative",
description: "XCom response serializer with native return type.",
} as const;
@@ -5759,6 +5797,10 @@ export const $XComResponseString = {
type: "string",
title: "Run Id",
},
+ dag_display_name: {
+ type: "string",
+ title: "Dag Display Name",
+ },
value: {
anyOf: [
{
@@ -5772,7 +5814,17 @@ export const $XComResponseString = {
},
},
type: "object",
- required: ["key", "timestamp", "logical_date", "map_index", "task_id",
"dag_id", "run_id", "value"],
+ required: [
+ "key",
+ "timestamp",
+ "logical_date",
+ "map_index",
+ "task_id",
+ "dag_id",
+ "run_id",
+ "dag_display_name",
+ "value",
+ ],
title: "XComResponseString",
description: "XCom response serializer with string return type.",
} as const;
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index a45ed3667e0..38d0a058b1b 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -902,6 +902,7 @@ export type JobResponse = {
executor_class: string | null;
hostname: string | null;
unixname: string | null;
+ dag_display_name?: string | null;
};
export type JsonValue = unknown;
@@ -1389,6 +1390,7 @@ export type XComResponse = {
task_id: string;
dag_id: string;
run_id: string;
+ dag_display_name: string;
};
/**
@@ -1402,6 +1404,7 @@ export type XComResponseNative = {
task_id: string;
dag_id: string;
run_id: string;
+ dag_display_name: string;
value: unknown;
};
@@ -1416,6 +1419,7 @@ export type XComResponseString = {
task_id: string;
dag_id: string;
run_id: string;
+ dag_display_name: string;
value: string | null;
};
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_job.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_job.py
index 6f6cb2fc156..d4ea28cb3b3 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_job.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_job.py
@@ -152,6 +152,7 @@ class TestGetJobs(TestJobEndpoint):
for idx, resp_job in enumerate(response_json["jobs"]):
expected_job = {
"id": self.scheduler_jobs[idx].id,
+ "dag_display_name": None,
"dag_id": None,
"state": "running",
"job_type": "SchedulerJob",
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index f75d8cfdab2..13fe165716c 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -344,6 +344,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
"kwargs": "{}",
},
"triggerer_job": {
+ "dag_display_name": None,
"dag_id": None,
"end_date": None,
"job_type": "TriggererJob",
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
index 92deaddd0a4..9bcc8138257 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
@@ -45,10 +45,12 @@ TEST_XCOM_VALUE_AS_JSON = json.dumps(TEST_XCOM_VALUE)
TEST_XCOM_KEY_2 = "test_xcom_key_non_existing"
TEST_DAG_ID = "test-dag-id"
+TEST_DAG_DISPLAY_NAME = "test-dag-id"
TEST_TASK_ID = "test-task-id"
TEST_EXECUTION_DATE = "2005-04-02T00:00:00+00:00"
TEST_DAG_ID_2 = "test-dag-id-2"
+TEST_DAG_DISPLAY_NAME_2 = "test-dag-id-2"
TEST_TASK_ID_2 = "test-task-id-2"
logical_date_parsed = timezone.parse(TEST_EXECUTION_DATE)
@@ -125,6 +127,7 @@ class TestGetXComEntry(TestXComEndpoint):
current_data = response.json()
assert current_data == {
"dag_id": TEST_DAG_ID,
+ "dag_display_name": TEST_DAG_DISPLAY_NAME,
"logical_date": logical_date_parsed.strftime("%Y-%m-%dT%H:%M:%SZ"),
"run_id": run_id,
"key": TEST_XCOM_KEY,
@@ -232,6 +235,7 @@ class TestGetXComEntries(TestXComEndpoint):
"xcom_entries": [
{
"dag_id": TEST_DAG_ID,
+ "dag_display_name": TEST_DAG_DISPLAY_NAME,
"logical_date": logical_date_formatted,
"run_id": run_id,
"key": f"{TEST_XCOM_KEY}-0",
@@ -241,6 +245,7 @@ class TestGetXComEntries(TestXComEndpoint):
},
{
"dag_id": TEST_DAG_ID,
+ "dag_display_name": TEST_DAG_DISPLAY_NAME,
"logical_date": logical_date_formatted,
"run_id": run_id,
"key": f"{TEST_XCOM_KEY}-1",
@@ -267,6 +272,7 @@ class TestGetXComEntries(TestXComEndpoint):
"xcom_entries": [
{
"dag_id": TEST_DAG_ID,
+ "dag_display_name": TEST_DAG_DISPLAY_NAME,
"logical_date": logical_date_formatted,
"run_id": run_id,
"key": f"{TEST_XCOM_KEY}-0",
@@ -276,6 +282,7 @@ class TestGetXComEntries(TestXComEndpoint):
},
{
"dag_id": TEST_DAG_ID,
+ "dag_display_name": TEST_DAG_DISPLAY_NAME,
"logical_date": logical_date_formatted,
"run_id": run_id,
"key": f"{TEST_XCOM_KEY}-1",
@@ -285,6 +292,7 @@ class TestGetXComEntries(TestXComEndpoint):
},
{
"dag_id": TEST_DAG_ID_2,
+ "dag_display_name": TEST_DAG_DISPLAY_NAME_2,
"logical_date": logical_date_formatted,
"run_id": run_id,
"key": f"{TEST_XCOM_KEY}-0",
@@ -294,6 +302,7 @@ class TestGetXComEntries(TestXComEndpoint):
},
{
"dag_id": TEST_DAG_ID_2,
+ "dag_display_name": TEST_DAG_DISPLAY_NAME_2,
"logical_date": logical_date_formatted,
"run_id": run_id,
"key": f"{TEST_XCOM_KEY}-1",
@@ -321,6 +330,7 @@ class TestGetXComEntries(TestXComEndpoint):
expected_entries = [
{
"dag_id": TEST_DAG_ID,
+ "dag_display_name": TEST_DAG_DISPLAY_NAME,
"logical_date": logical_date_formatted,
"run_id": run_id,
"key": TEST_XCOM_KEY,
@@ -334,6 +344,7 @@ class TestGetXComEntries(TestXComEndpoint):
expected_entries = [
{
"dag_id": TEST_DAG_ID,
+ "dag_display_name": TEST_DAG_DISPLAY_NAME,
"logical_date": logical_date_formatted,
"run_id": run_id,
"key": TEST_XCOM_KEY,
@@ -357,6 +368,7 @@ class TestGetXComEntries(TestXComEndpoint):
[
{
"dag_id": TEST_DAG_ID,
+ "dag_display_name": TEST_DAG_DISPLAY_NAME,
"logical_date": logical_date_formatted,
"run_id": run_id,
"key": TEST_XCOM_KEY,
@@ -366,6 +378,7 @@ class TestGetXComEntries(TestXComEndpoint):
},
{
"dag_id": TEST_DAG_ID,
+ "dag_display_name": TEST_DAG_DISPLAY_NAME,
"logical_date": logical_date_formatted,
"run_id": run_id,
"key": TEST_XCOM_KEY,
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 41eaf39201d..48992ec5285 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -562,6 +562,7 @@ class JobResponse(BaseModel):
executor_class: Annotated[str | None, Field(title="Executor Class")] = None
hostname: Annotated[str | None, Field(title="Hostname")] = None
unixname: Annotated[str | None, Field(title="Unixname")] = None
+ dag_display_name: Annotated[str | None, Field(title="Dag Display Name")] =
None
class JsonValue(RootModel[Any]):
@@ -892,6 +893,7 @@ class XComResponse(BaseModel):
task_id: Annotated[str, Field(title="Task Id")]
dag_id: Annotated[str, Field(title="Dag Id")]
run_id: Annotated[str, Field(title="Run Id")]
+ dag_display_name: Annotated[str, Field(title="Dag Display Name")]
class XComResponseNative(BaseModel):
@@ -906,6 +908,7 @@ class XComResponseNative(BaseModel):
task_id: Annotated[str, Field(title="Task Id")]
dag_id: Annotated[str, Field(title="Dag Id")]
run_id: Annotated[str, Field(title="Run Id")]
+ dag_display_name: Annotated[str, Field(title="Dag Display Name")]
value: Annotated[Any, Field(title="Value")]
@@ -921,6 +924,7 @@ class XComResponseString(BaseModel):
task_id: Annotated[str, Field(title="Task Id")]
dag_id: Annotated[str, Field(title="Dag Id")]
run_id: Annotated[str, Field(title="Run Id")]
+ dag_display_name: Annotated[str, Field(title="Dag Display Name")]
value: Annotated[str | None, Field(title="Value")] = None