This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8379804182a AIP-65 | Add dag versions to DAGRunResponse (#46484)
8379804182a is described below
commit 8379804182a904d6be2f30ff0123b7a42325b9a2
Author: LIU ZHE YOU <[email protected]>
AuthorDate: Tue Feb 25 23:23:25 2025 +0800
AIP-65 | Add dag versions to DAGRunResponse (#46484)
* AIP-65 | Add dag versions to DAGRunResponse
* fixup! Fix configure_git_connection_for_dag_bundle fixture import
* fixup! Fix dag_versions property
* Fix test_cli_assets_materialize
* Fix nits in test_dag_run
* Fix conflicts, fix CI
---------
Co-authored-by: pierrejeambrun <[email protected]>
---
airflow/api_fastapi/core_api/datamodels/dag_run.py | 2 +
.../api_fastapi/core_api/openapi/v1-generated.yaml | 6 +
airflow/models/dagrun.py | 6 +-
airflow/ui/openapi-gen/requests/schemas.gen.ts | 8 ++
airflow/ui/openapi-gen/requests/types.gen.ts | 1 +
tests/api_fastapi/conftest.py | 55 ++++-----
.../core_api/routes/public/test_assets.py | 2 +
.../core_api/routes/public/test_dag_run.py | 125 +++++++++++----------
tests/api_fastapi/core_api/routes/ui/test_dags.py | 2 +
.../commands/remote_commands/test_asset_command.py | 8 +-
10 files changed, 129 insertions(+), 86 deletions(-)
diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py
b/airflow/api_fastapi/core_api/datamodels/dag_run.py
index 06943dc3860..b2a7ffa860a 100644
--- a/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -24,6 +24,7 @@ from typing import TYPE_CHECKING
from pydantic import AwareDatetime, Field, NonNegativeInt, model_validator
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
+from airflow.api_fastapi.core_api.datamodels.dag_versions import
DagVersionResponse
from airflow.models import DagRun
from airflow.timetables.base import DataInterval
from airflow.utils import timezone
@@ -74,6 +75,7 @@ class DAGRunResponse(BaseModel):
triggered_by: DagRunTriggeredByType
conf: dict
note: str | None
+ dag_versions: list[DagVersionResponse]
class DAGRunCollectionResponse(BaseModel):
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 8a496750162..3324e39777b 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -8537,6 +8537,11 @@ components:
- type: string
- type: 'null'
title: Note
+ dag_versions:
+ items:
+ $ref: '#/components/schemas/DagVersionResponse'
+ type: array
+ title: Dag Versions
type: object
required:
- dag_run_id
@@ -8554,6 +8559,7 @@ components:
- triggered_by
- conf
- note
+ - dag_versions
title: DAGRunResponse
description: DAG Run serializer for responses.
DAGRunStates:
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 45271c810ef..06c533c1299 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -310,7 +310,11 @@ class DagRun(Base, LoggingMixin):
@property
def dag_versions(self) -> list[DagVersion]:
"""Return the DAG versions associated with the TIs of this DagRun."""
- dag_versions = list(dict.fromkeys(list(self._tih_dag_versions) +
list(self._ti_dag_versions)))
+ dag_versions = [
+ dv
+ for dv in dict.fromkeys(list(self._tih_dag_versions) +
list(self._ti_dag_versions))
+ if dv is not None
+ ]
sorted_ = sorted(dag_versions, key=lambda dv: dv.id)
return sorted_
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index b2f32e1622b..cd5583cf104 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -2342,6 +2342,13 @@ export const $DAGRunResponse = {
],
title: "Note",
},
+ dag_versions: {
+ items: {
+ $ref: "#/components/schemas/DagVersionResponse",
+ },
+ type: "array",
+ title: "Dag Versions",
+ },
},
type: "object",
required: [
@@ -2360,6 +2367,7 @@ export const $DAGRunResponse = {
"triggered_by",
"conf",
"note",
+ "dag_versions",
],
title: "DAGRunResponse",
description: "DAG Run serializer for responses.",
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 32b93883925..6d4fb662392 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -631,6 +631,7 @@ export type DAGRunResponse = {
[key: string]: unknown;
};
note: string | null;
+ dag_versions: Array<DagVersionResponse>;
};
/**
diff --git a/tests/api_fastapi/conftest.py b/tests/api_fastapi/conftest.py
index abb2c135dcb..3e5b44651f0 100644
--- a/tests/api_fastapi/conftest.py
+++ b/tests/api_fastapi/conftest.py
@@ -49,15 +49,7 @@ def client():
@pytest.fixture
-def make_dag_with_multiple_versions(dag_maker, session):
- """
- Create DAG with multiple versions
-
- Version 1 will have 1 task, version 2 will have 2 tasks, and version 3
will have 3 tasks.
-
- Configure the associated dag_bundles.
- """
-
+def configure_git_connection_for_dag_bundle(session):
# Git connection is required for the bundles to have a url.
connection = Connection(
conn_id="git_default",
@@ -68,7 +60,6 @@ def make_dag_with_multiple_versions(dag_maker, session):
login="",
)
session.add(connection)
-
with conf_vars(
{
(
@@ -77,24 +68,36 @@ def make_dag_with_multiple_versions(dag_maker, session):
): '[{ "name": "dag_maker", "classpath":
"airflow.dag_processing.bundles.git.GitDagBundle", "kwargs": {"subdir": "dags",
"tracking_ref": "main", "refresh_interval": 0}}, { "name":
"another_bundle_name", "classpath":
"airflow.dag_processing.bundles.git.GitDagBundle", "kwargs": {"subdir": "dags",
"tracking_ref": "main", "refresh_interval": 0}}]'
}
):
- dag_id = "dag_with_multiple_versions"
-
- for version_number in range(1, 4):
- with dag_maker(dag_id) as dag:
- for task_number in range(version_number):
- EmptyOperator(task_id=f"task{task_number + 1}")
- dag.sync_to_db()
- SerializedDagModel.write_dag(
- dag, bundle_name="dag_maker",
bundle_version=f"some_commit_hash{version_number}"
- )
- dag_maker.create_dagrun(
- run_id=f"run{version_number}",
- logical_date=datetime.datetime(2020, 1, version_number,
tzinfo=datetime.timezone.utc),
- dag_version=DagVersion.get_version(dag_id=dag_id,
version_number=version_number),
- )
yield
- clear_db_connections(False)
+ clear_db_connections(False)
+
+
[email protected]
+def make_dag_with_multiple_versions(dag_maker,
configure_git_connection_for_dag_bundle):
+ """
+ Create DAG with multiple versions
+
+ Version 1 will have 1 task, version 2 will have 2 tasks, and version 3
will have 3 tasks.
+
+ Configure the associated dag_bundles.
+ """
+
+ dag_id = "dag_with_multiple_versions"
+
+ for version_number in range(1, 4):
+ with dag_maker(dag_id) as dag:
+ for task_number in range(version_number):
+ EmptyOperator(task_id=f"task{task_number + 1}")
+ dag.sync_to_db()
+ SerializedDagModel.write_dag(
+ dag, bundle_name="dag_maker",
bundle_version=f"some_commit_hash{version_number}"
+ )
+ dag_maker.create_dagrun(
+ run_id=f"run{version_number}",
+ logical_date=datetime.datetime(2020, 1, version_number,
tzinfo=datetime.timezone.utc),
+ dag_version=DagVersion.get_version(dag_id=dag_id,
version_number=version_number),
+ )
@pytest.fixture(scope="module")
diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py
b/tests/api_fastapi/core_api/routes/public/test_assets.py
index 73322694490..743e01425ad 100644
--- a/tests/api_fastapi/core_api/routes/public/test_assets.py
+++ b/tests/api_fastapi/core_api/routes/public/test_assets.py
@@ -991,12 +991,14 @@ class TestPostAssetMaterialize(TestAssets):
with dag_maker(self.DAG_ASSET_NO, schedule=None, session=session):
EmptyOperator(task_id="task")
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_should_respond_200(self, test_client):
response = test_client.post("/public/assets/1/materialize")
assert response.status_code == 200
assert response.json() == {
"dag_run_id": mock.ANY,
"dag_id": self.DAG_ASSET1_ID,
+ "dag_versions": mock.ANY,
"logical_date": None,
"queued_at": mock.ANY,
"run_after": mock.ANY,
diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py
b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
index d1f5c310316..f12cab28468 100644
--- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -18,12 +18,14 @@
from __future__ import annotations
from datetime import datetime, timedelta
+from typing import TYPE_CHECKING
from unittest import mock
import pytest
import time_machine
from sqlalchemy import select
+from airflow.api_fastapi.core_api.datamodels.dag_versions import
DagVersionResponse
from airflow.listeners.listener import get_listener_manager
from airflow.models import DagModel, DagRun
from airflow.models.asset import AssetEvent, AssetModel
@@ -35,6 +37,7 @@ from airflow.utils.session import provide_session
from airflow.utils.state import DagRunState, State
from airflow.utils.types import DagRunTriggeredByType, DagRunType
+from tests_common.test_utils.api_fastapi import _check_last_log
from tests_common.test_utils.db import (
clear_db_dags,
clear_db_logs,
@@ -42,7 +45,9 @@ from tests_common.test_utils.db import (
clear_db_serialized_dags,
)
from tests_common.test_utils.format_datetime import from_datetime_to_zulu,
from_datetime_to_zulu_without_ms
-from tests_common.test_utils.www import _check_last_log
+
+if TYPE_CHECKING:
+ from airflow.models.dag_version import DagVersion
pytestmark = pytest.mark.db_test
@@ -89,11 +94,7 @@ def setup(request, dag_maker, session=None):
if "no_setup" in request.keywords:
return
- with dag_maker(
- DAG1_ID,
- schedule=None,
- start_date=START_DATE1,
- ):
+ with dag_maker(DAG1_ID, schedule=None, start_date=START_DATE1,
serialized=True):
task1 = EmptyOperator(task_id="task_1")
task2 = EmptyOperator(task_id="task_2")
@@ -130,8 +131,9 @@ def setup(request, dag_maker, session=None):
ti2.task = task2
ti2.state = State.FAILED
- with dag_maker(DAG2_ID, schedule=None, start_date=START_DATE2,
params=DAG2_PARAM):
+ with dag_maker(DAG2_ID, schedule=None, start_date=START_DATE2,
params=DAG2_PARAM, serialized=True):
EmptyOperator(task_id="task_2")
+
dag_maker.create_dagrun(
run_id=DAG2_RUN1_ID,
state=DAG2_RUN1_STATE,
@@ -148,7 +150,6 @@ def setup(request, dag_maker, session=None):
)
dag_maker.sync_dagbag_to_db()
- dag_maker.dag_model
dag_maker.dag_model.has_task_concurrency_limits = True
session.merge(ti1)
session.merge(ti2)
@@ -156,6 +157,37 @@ def setup(request, dag_maker, session=None):
session.commit()
+def get_dag_versions_dict(dag_versions: list[DagVersion]) -> list[dict]:
+ return [
+ # must set mode="json" or the created_at and id will be python
datetime and UUID instead of string
+ DagVersionResponse.model_validate(dag_version,
from_attributes=True).model_dump(mode="json")
+ for dag_version in dag_versions
+ ]
+
+
+def get_dag_run_dict(run: DagRun):
+ return {
+ "dag_run_id": run.run_id,
+ "dag_id": run.dag_id,
+ "logical_date": from_datetime_to_zulu_without_ms(run.logical_date),
+ "queued_at": from_datetime_to_zulu(run.queued_at) if run.queued_at
else None,
+ "run_after": from_datetime_to_zulu_without_ms(run.run_after),
+ "start_date": from_datetime_to_zulu_without_ms(run.start_date),
+ "end_date": from_datetime_to_zulu(run.end_date),
+ "data_interval_start":
from_datetime_to_zulu_without_ms(run.data_interval_start),
+ "data_interval_end":
from_datetime_to_zulu_without_ms(run.data_interval_end),
+ "last_scheduling_decision": (
+ from_datetime_to_zulu(run.last_scheduling_decision) if
run.last_scheduling_decision else None
+ ),
+ "run_type": run.run_type,
+ "state": run.state,
+ "triggered_by": run.triggered_by.value,
+ "conf": run.conf,
+ "note": run.note,
+ "dag_versions": get_dag_versions_dict(run.dag_versions),
+ }
+
+
class TestGetDagRun:
@pytest.mark.parametrize(
"dag_id, run_id, state, run_type, triggered_by, dag_run_note",
@@ -194,6 +226,7 @@ class TestGetDagRun:
),
],
)
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_get_dag_run(self, test_client, dag_id, run_id, state, run_type,
triggered_by, dag_run_note):
response = test_client.get(f"/public/dags/{dag_id}/dagRuns/{run_id}")
assert response.status_code == 200
@@ -213,29 +246,8 @@ class TestGetDagRun:
class TestGetDagRuns:
- @staticmethod
- def get_dag_run_dict(run: DagRun):
- return {
- "dag_run_id": run.run_id,
- "dag_id": run.dag_id,
- "logical_date": from_datetime_to_zulu_without_ms(run.logical_date),
- "queued_at": from_datetime_to_zulu(run.queued_at) if run.queued_at
else None,
- "run_after": from_datetime_to_zulu_without_ms(run.run_after),
- "start_date": from_datetime_to_zulu_without_ms(run.start_date),
- "end_date": from_datetime_to_zulu(run.end_date),
- "data_interval_start":
from_datetime_to_zulu_without_ms(run.data_interval_start),
- "data_interval_end":
from_datetime_to_zulu_without_ms(run.data_interval_end),
- "last_scheduling_decision": (
- from_datetime_to_zulu(run.last_scheduling_decision) if
run.last_scheduling_decision else None
- ),
- "run_type": run.run_type,
- "state": run.state,
- "triggered_by": run.triggered_by.value,
- "conf": run.conf,
- "note": run.note,
- }
-
@pytest.mark.parametrize("dag_id, total_entries", [(DAG1_ID, 2), (DAG2_ID,
2), ("~", 4)])
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_get_dag_runs(self, test_client, session, dag_id, total_entries):
response = test_client.get(f"/public/dags/{dag_id}/dagRuns")
assert response.status_code == 200
@@ -247,9 +259,9 @@ class TestGetDagRuns:
.where(DagRun.dag_id == each["dag_id"], DagRun.run_id ==
each["dag_run_id"])
.one()
)
- expected = self.get_dag_run_dict(run)
- assert each == expected
+ assert each == get_dag_run_dict(run)
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_get_dag_runs_not_found(self, test_client):
response = test_client.get("/public/dags/invalid/dagRuns")
assert response.status_code == 404
@@ -279,6 +291,7 @@ class TestGetDagRuns:
pytest.param("conf", [DAG1_RUN1_ID, DAG1_RUN2_ID],
id="order_by_conf"),
],
)
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_return_correct_results_with_order_by(self, test_client, order_by,
expected_order):
# Test ascending order
response = test_client.get("/public/dags/test_dag1/dagRuns",
params={"order_by": order_by})
@@ -306,6 +319,7 @@ class TestGetDagRuns:
({"limit": 1, "offset": 2}, []),
],
)
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_limit_and_offset(self, test_client, query_params,
expected_dag_id_order):
response = test_client.get("/public/dags/test_dag1/dagRuns",
params=query_params)
assert response.status_code == 200
@@ -435,6 +449,7 @@ class TestGetDagRuns:
),
],
)
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_filters(self, test_client, dag_id, query_params,
expected_dag_id_list):
response = test_client.get(f"/public/dags/{dag_id}/dagRuns",
params=query_params)
assert response.status_code == 200
@@ -524,30 +539,7 @@ class TestGetDagRuns:
class TestListDagRunsBatch:
- @staticmethod
- def get_dag_run_dict(run: DagRun):
- return {
- "dag_run_id": run.run_id,
- "dag_id": run.dag_id,
- "logical_date": from_datetime_to_zulu_without_ms(run.logical_date),
- "queued_at": from_datetime_to_zulu_without_ms(run.queued_at) if
run.queued_at else None,
- "run_after": from_datetime_to_zulu_without_ms(run.run_after),
- "start_date": from_datetime_to_zulu_without_ms(run.start_date),
- "end_date": from_datetime_to_zulu(run.end_date),
- "data_interval_start":
from_datetime_to_zulu_without_ms(run.data_interval_start),
- "data_interval_end":
from_datetime_to_zulu_without_ms(run.data_interval_end),
- "last_scheduling_decision": (
- from_datetime_to_zulu_without_ms(run.last_scheduling_decision)
- if run.last_scheduling_decision
- else None
- ),
- "run_type": run.run_type,
- "state": run.state,
- "triggered_by": run.triggered_by.value,
- "conf": run.conf,
- "note": run.note,
- }
-
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_list_dag_runs_return_200(self, test_client, session):
response = test_client.post("/public/dags/~/dagRuns/list", json={})
assert response.status_code == 200
@@ -555,7 +547,7 @@ class TestListDagRunsBatch:
assert body["total_entries"] == 4
for each in body["dag_runs"]:
run = session.query(DagRun).where(DagRun.run_id ==
each["dag_run_id"]).one()
- expected = self.get_dag_run_dict(run)
+ expected = get_dag_run_dict(run)
assert each == expected
def test_list_dag_runs_with_invalid_dag_id(self, test_client):
@@ -580,6 +572,7 @@ class TestListDagRunsBatch:
[["invalid"], 200, []],
],
)
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_list_dag_runs_with_dag_ids_filter(self, test_client, dag_ids,
status_code, expected_dag_id_list):
response = test_client.post("/public/dags/~/dagRuns/list",
json={"dag_ids": dag_ids})
assert response.status_code == status_code
@@ -611,6 +604,7 @@ class TestListDagRunsBatch:
pytest.param("conf", DAG_RUNS_LIST, id="order_by_conf"),
],
)
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_dag_runs_ordering(self, test_client, order_by, expected_order):
# Test ascending order
response = test_client.post("/public/dags/~/dagRuns/list",
json={"order_by": order_by})
@@ -638,6 +632,7 @@ class TestListDagRunsBatch:
({"page_limit": 1, "page_offset": 2}, DAG_RUNS_LIST[2:3]),
],
)
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_limit_and_offset(self, test_client, post_body,
expected_dag_id_order):
response = test_client.post("/public/dags/~/dagRuns/list",
json=post_body)
assert response.status_code == 200
@@ -755,6 +750,7 @@ class TestListDagRunsBatch:
),
],
)
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_filters(self, test_client, post_body, expected_dag_id_list):
response = test_client.post("/public/dags/~/dagRuns/list",
json=post_body)
assert response.status_code == 200
@@ -902,6 +898,7 @@ class TestPatchDagRun:
),
],
)
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_patch_dag_run(self, test_client, dag_id, run_id, patch_body,
response_body, session):
response =
test_client.patch(f"/public/dags/{dag_id}/dagRuns/{run_id}", json=patch_body)
assert response.status_code == 200
@@ -943,6 +940,7 @@ class TestPatchDagRun:
),
],
)
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_patch_dag_run_with_update_mask(
self, test_client, query_params, patch_body, response_body,
expected_status_code
):
@@ -987,6 +985,7 @@ class TestPatchDagRun:
("failed", [DagRunState.FAILED]),
],
)
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_patch_dag_run_notifies_listeners(self, test_client, state,
listener_state):
from tests.listeners.class_listener import ClassBasedListener
@@ -1011,6 +1010,7 @@ class TestDeleteDagRun:
class TestGetDagRunAssetTriggerEvents:
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_should_respond_200(self, test_client, dag_maker, session):
asset1 = Asset(name="ds1", uri="file:///da1")
@@ -1085,6 +1085,7 @@ class TestGetDagRunAssetTriggerEvents:
class TestClearDagRun:
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_clear_dag_run(self, test_client, session):
response = test_client.post(
f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear",
@@ -1111,6 +1112,7 @@ class TestClearDagRun:
[{"only_failed": True}, DAG1_RUN2_ID, ["failed"]],
],
)
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_clear_dag_run_dry_run(self, test_client, session, body,
dag_run_id, expected_state):
response =
test_client.post(f"/public/dags/{DAG1_ID}/dagRuns/{dag_run_id}/clear",
json=body)
assert response.status_code == 200
@@ -1175,6 +1177,7 @@ class TestTriggerDagRun:
),
],
)
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_should_respond_200(
self, test_client, dag_run_id, note, data_interval_start,
data_interval_end, session
):
@@ -1207,10 +1210,14 @@ class TestTriggerDagRun:
expected_data_interval_end = data_interval_end.replace("+00:00",
"Z")
expected_logical_date = fixed_now.replace("+00:00", "Z")
+ run = (
+ session.query(DagRun).where(DagRun.dag_id == DAG1_ID,
DagRun.run_id == expected_dag_run_id).one()
+ )
expected_response_json = {
"conf": {},
"dag_id": DAG1_ID,
"dag_run_id": expected_dag_run_id,
+ "dag_versions": get_dag_versions_dict(run.dag_versions),
"end_date": None,
"logical_date": expected_logical_date,
"run_after": fixed_now.replace("+00:00", "Z"),
@@ -1347,6 +1354,7 @@ class TestTriggerDagRun:
)
@time_machine.travel(timezone.utcnow(), tick=False)
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_should_response_409_for_duplicate_logical_date(self, test_client):
RUN_ID_1 = "random_1"
RUN_ID_2 = "random_2"
@@ -1365,6 +1373,7 @@ class TestTriggerDagRun:
assert response_1.json() == {
"dag_run_id": RUN_ID_1,
"dag_id": DAG1_ID,
+ "dag_versions": mock.ANY,
"logical_date": now,
"queued_at": now,
"start_date": None,
@@ -1438,6 +1447,7 @@ class TestTriggerDagRun:
assert "detail" in response_json
assert list(response_json["detail"].keys()) == ["reason", "statement",
"orig_error"]
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_should_respond_200_with_null_logical_date(self, test_client):
response = test_client.post(
f"/public/dags/{DAG1_ID}/dagRuns",
@@ -1447,6 +1457,7 @@ class TestTriggerDagRun:
assert response.json() == {
"dag_run_id": mock.ANY,
"dag_id": DAG1_ID,
+ "dag_versions": mock.ANY,
"logical_date": None,
"queued_at": mock.ANY,
"run_after": mock.ANY,
diff --git a/tests/api_fastapi/core_api/routes/ui/test_dags.py
b/tests/api_fastapi/core_api/routes/ui/test_dags.py
index fc6d5093569..81c4f570912 100644
--- a/tests/api_fastapi/core_api/routes/ui/test_dags.py
+++ b/tests/api_fastapi/core_api/routes/ui/test_dags.py
@@ -83,6 +83,7 @@ class TestRecentDagRuns(TestPublicDagEndpoint):
({"dag_display_name_pattern": "test_dag2"}, [DAG2_ID], 5),
],
)
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_recent_dag_runs(self, test_client, query_params, expected_ids,
expected_total_dag_runs):
response = test_client.get("/ui/dags/recent_dag_runs",
params=query_params)
assert response.status_code == 200
@@ -92,6 +93,7 @@ class TestRecentDagRuns(TestPublicDagEndpoint):
"dag_id",
"state",
"run_after",
+ "dag_versions",
]
for recent_dag_runs in body["dags"]:
dag_runs = recent_dag_runs["latest_dag_runs"]
diff --git a/tests/cli/commands/remote_commands/test_asset_command.py
b/tests/cli/commands/remote_commands/test_asset_command.py
index d1703a6aa01..32fb07124de 100644
--- a/tests/cli/commands/remote_commands/test_asset_command.py
+++ b/tests/cli/commands/remote_commands/test_asset_command.py
@@ -122,12 +122,16 @@ def test_cli_assets_materialize(parser: ArgumentParser)
-> None:
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
asset_command.asset_materialize(args)
- run_list = json.loads(temp_stdout.getvalue())
+ output = temp_stdout.getvalue()
+ # Skip the first line of `temp_stdout` since the current `DAGRunResponse`
requires `DagBundlesManager`, which logs `INFO - DAG bundles loaded:
dags-folder, example_dags`.
+ output = "\n".join(output.splitlines()[1:])
+ run_list = json.loads(output)
assert len(run_list) == 1
# No good way to statically compare these.
- undeterministic = {
+ undeterministic: dict = {
"dag_run_id": None,
+ "dag_versions": [],
"data_interval_end": None,
"data_interval_start": None,
"logical_date": None,