This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new aa6f76c46de AIP-83 Logical date should be required field when
triggering run via API (#46390)
aa6f76c46de is described below
commit aa6f76c46de2cefd2d61991e0febf4f4b29cb885
Author: vatsrahul1001 <[email protected]>
AuthorDate: Mon Feb 10 22:11:13 2025 +0530
AIP-83 Logical date should be required field when triggering run via API
(#46390)
* restore unique constraint on logical date and make it nullable
* restore unique constraint on logical date and make it nullable
* fix migration file
* fix migration file
* refactor backfill reprocess logic
* fixing tests
* fix tests
* remove default date from logical date in dag run model
* fix task_filter
* fix failing tests
* make logical date as required field
* refactor
* remove backfill related changes
* implement review comments
* add time now to var
* fix tests
* fix review comments
* Update tests/api_fastapi/core_api/routes/public/test_dag_run.py
Co-authored-by: Wei Lee <[email protected]>
---------
Co-authored-by: Wei Lee <[email protected]>
---
airflow/api_fastapi/core_api/datamodels/dag_run.py | 18 +++---
.../api_fastapi/core_api/openapi/v1-generated.yaml | 8 +++
.../api_fastapi/core_api/routes/public/dag_run.py | 20 +++----
airflow/ui/openapi-gen/requests/schemas.gen.ts | 13 +++++
airflow/ui/openapi-gen/requests/types.gen.ts | 1 +
airflow/ui/src/queries/useTrigger.ts | 1 +
.../core_api/routes/public/test_dag_run.py | 66 ++++++++++++++++------
tests/models/test_dagrun.py | 5 +-
8 files changed, 91 insertions(+), 41 deletions(-)
diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py
b/airflow/api_fastapi/core_api/datamodels/dag_run.py
index 78e0254f622..18be129a195 100644
--- a/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -20,11 +20,11 @@ from __future__ import annotations
from datetime import datetime
from enum import Enum
-from pydantic import AwareDatetime, Field, NonNegativeInt, computed_field,
model_validator
+import pendulum
+from pydantic import AwareDatetime, Field, NonNegativeInt, model_validator
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
from airflow.models import DagRun
-from airflow.utils import timezone
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
@@ -82,9 +82,9 @@ class TriggerDAGRunPostBody(StrictBaseModel):
"""Trigger DAG Run Serializer for POST body."""
dag_run_id: str | None = None
+ logical_date: AwareDatetime | None
data_interval_start: AwareDatetime | None = None
data_interval_end: AwareDatetime | None = None
-
conf: dict = Field(default_factory=dict)
note: str | None = None
@@ -96,18 +96,16 @@ class TriggerDAGRunPostBody(StrictBaseModel):
)
return values
+ ## when logical date is null, the run id should be generated from
run_after + random string.
+ # TODO: AIP83: we need to modify this validator after
https://github.com/apache/airflow/pull/46398 is merged
@model_validator(mode="after")
def validate_dag_run_id(self):
if not self.dag_run_id:
- self.dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL,
self.logical_date)
+ self.dag_run_id = DagRun.generate_run_id(
+ DagRunType.MANUAL, self.logical_date or pendulum.now("UTC")
+ )
return self
- # Mypy issue https://github.com/python/mypy/issues/1362
- @computed_field # type: ignore[misc]
- @property
- def logical_date(self) -> datetime:
- return timezone.utcnow()
-
class DAGRunsBatchBody(StrictBaseModel):
"""List DAG Runs body for batch endpoint."""
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 486c371682f..6d2223e6c25 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -10501,6 +10501,12 @@ components:
- type: string
- type: 'null'
title: Dag Run Id
+ logical_date:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Logical Date
data_interval_start:
anyOf:
- type: string
@@ -10523,6 +10529,8 @@ components:
title: Note
additionalProperties: false
type: object
+ required:
+ - logical_date
title: TriggerDAGRunPostBody
description: Trigger DAG Run Serializer for POST body.
TriggerResponse:
diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py
b/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 95df203a44a..066d7bc06f3 100644
--- a/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -66,6 +66,7 @@ from airflow.listeners.listener import get_listener_manager
from airflow.models import DAG, DagModel, DagRun
from airflow.models.dag_version import DagVersion
from airflow.timetables.base import DataInterval
+from airflow.utils import timezone
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
@@ -345,6 +346,7 @@ def trigger_dag_run(
) -> DAGRunResponse:
"""Trigger a DAG."""
dm = session.scalar(select(DagModel).where(DagModel.is_active,
DagModel.dag_id == dag_id).limit(1))
+ now = pendulum.now("UTC")
if not dm:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with dag_id:
'{dag_id}' not found")
@@ -354,7 +356,8 @@ def trigger_dag_run(
f"DAG with dag_id: '{dag_id}' has import errors and cannot be
triggered",
)
- logical_date = pendulum.instance(body.logical_date)
+ logical_date = timezone.coerce_datetime(body.logical_date)
+ coerced_logical_date = timezone.coerce_datetime(logical_date)
try:
dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
@@ -365,20 +368,11 @@ def trigger_dag_run(
end=pendulum.instance(body.data_interval_end),
)
else:
- data_interval =
dag.timetable.infer_manual_data_interval(run_after=logical_date)
-
- if body.dag_run_id:
- run_id = body.dag_run_id
- else:
- run_id = dag.timetable.generate_run_id(
- run_type=DagRunType.MANUAL,
- logical_date=logical_date,
- data_interval=data_interval,
- )
+ data_interval =
dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date or now)
dag_run = dag.create_dagrun(
- run_id=run_id,
- logical_date=logical_date,
+ run_id=cast(str, body.dag_run_id),
+ logical_date=coerced_logical_date,
data_interval=data_interval,
run_after=data_interval.end,
conf=body.conf,
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 1d688c3442d..3e69736e87d 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -5816,6 +5816,18 @@ export const $TriggerDAGRunPostBody = {
],
title: "Dag Run Id",
},
+ logical_date: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Logical Date",
+ },
data_interval_start: {
anyOf: [
{
@@ -5858,6 +5870,7 @@ export const $TriggerDAGRunPostBody = {
},
additionalProperties: false,
type: "object",
+ required: ["logical_date"],
title: "TriggerDAGRunPostBody",
description: "Trigger DAG Run Serializer for POST body.",
} as const;
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index a63ec17ffad..9a2ed62122a 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1416,6 +1416,7 @@ export type TimeDelta = {
*/
export type TriggerDAGRunPostBody = {
dag_run_id?: string | null;
+ logical_date: string | null;
data_interval_start?: string | null;
data_interval_end?: string | null;
conf?: {
diff --git a/airflow/ui/src/queries/useTrigger.ts
b/airflow/ui/src/queries/useTrigger.ts
index 2c56bda6695..3b55be037c6 100644
--- a/airflow/ui/src/queries/useTrigger.ts
+++ b/airflow/ui/src/queries/useTrigger.ts
@@ -108,6 +108,7 @@ export const useTrigger = ({ dagId, onSuccessConfirm }: {
dagId: string; onSucce
dag_run_id: checkDagRunId,
data_interval_end: formattedDataIntervalEnd,
data_interval_start: formattedDataIntervalStart,
+ logical_date: null,
note: checkNote,
},
});
diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py
b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
index b24fd993415..3723232a7cd 100644
--- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -1139,12 +1139,7 @@ class TestTriggerDagRun:
"dag_run_id, note, data_interval_start, data_interval_end",
[
("dag_run_5", "test-note", None, None),
- (
- "dag_run_6",
- "test-note",
- "2024-01-03T00:00:00+00:00",
- "2024-01-04T05:00:00+00:00",
- ),
+ ("dag_run_6", "test-note", "2024-01-03T00:00:00+00:00",
"2024-01-04T05:00:00+00:00"),
(None, None, None, None),
],
)
@@ -1153,7 +1148,7 @@ class TestTriggerDagRun:
):
fixed_now = timezone.utcnow().isoformat()
- request_json = {"note": note}
+ request_json = {"note": note, "logical_date": fixed_now}
if dag_run_id is not None:
request_json["dag_run_id"] = dag_run_id
if data_interval_start is not None:
@@ -1297,29 +1292,34 @@ class TestTriggerDagRun:
],
)
def test_invalid_data(self, test_client, post_body, expected_detail):
+ now = timezone.utcnow().isoformat()
+ post_body["logical_date"] = now
response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns",
json=post_body)
assert response.status_code == 422
assert response.json() == expected_detail
@mock.patch("airflow.models.DAG.create_dagrun")
def test_dagrun_creation_exception_is_handled(self, mock_create_dagrun,
test_client):
+ now = timezone.utcnow().isoformat()
error_message = "Encountered Error"
mock_create_dagrun.side_effect = ValueError(error_message)
- response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", json={})
+ response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns",
json={"logical_date": now})
assert response.status_code == 400
assert response.json() == {"detail": error_message}
def test_should_respond_404_if_a_dag_is_inactive(self, test_client,
session):
+ now = timezone.utcnow().isoformat()
self._dags_for_trigger_tests(session)
- response = test_client.post("/public/dags/inactive/dagRuns", json={})
+ response = test_client.post("/public/dags/inactive/dagRuns",
json={"logical_date": now})
assert response.status_code == 404
assert response.json()["detail"] == "DAG with dag_id: 'inactive' not
found"
def test_should_respond_400_if_a_dag_has_import_errors(self, test_client,
session):
+ now = timezone.utcnow().isoformat()
self._dags_for_trigger_tests(session)
- response = test_client.post("/public/dags/import_errors/dagRuns",
json={})
+ response = test_client.post("/public/dags/import_errors/dagRuns",
json={"logical_date": now})
assert response.status_code == 400
assert (
response.json()["detail"]
@@ -1334,11 +1334,11 @@ class TestTriggerDagRun:
note = "duplicate logical date test"
response_1 = test_client.post(
f"/public/dags/{DAG1_ID}/dagRuns",
- json={"dag_run_id": RUN_ID_1, "note": note},
+ json={"dag_run_id": RUN_ID_1, "note": note, "logical_date": now},
)
response_2 = test_client.post(
f"/public/dags/{DAG1_ID}/dagRuns",
- json={"dag_run_id": RUN_ID_2, "note": note},
+ json={"dag_run_id": RUN_ID_2, "note": note, "logical_date": now},
)
assert response_1.status_code == 200
@@ -1378,9 +1378,14 @@ class TestTriggerDagRun:
def test_should_response_422_for_missing_start_date_or_end_date(
self, test_client, data_interval_start, data_interval_end
):
+ now = timezone.utcnow().isoformat()
response = test_client.post(
f"/public/dags/{DAG1_ID}/dagRuns",
- json={"data_interval_start": data_interval_start,
"data_interval_end": data_interval_end},
+ json={
+ "data_interval_start": data_interval_start,
+ "data_interval_end": data_interval_end,
+ "logical_date": now,
+ },
)
assert response.status_code == 422
assert (
@@ -1389,21 +1394,50 @@ class TestTriggerDagRun:
)
def test_raises_validation_error_for_invalid_params(self, test_client):
+ now = timezone.utcnow().isoformat()
response = test_client.post(
f"/public/dags/{DAG2_ID}/dagRuns",
- json={"conf": {"validated_number": 5000}},
+ json={"conf": {"validated_number": 5000}, "logical_date": now},
)
assert response.status_code == 400
assert "Invalid input for param validated_number" in
response.json()["detail"]
def test_response_404(self, test_client):
- response = test_client.post("/public/dags/randoms/dagRuns", json={})
+ now = timezone.utcnow().isoformat()
+ response = test_client.post("/public/dags/randoms/dagRuns",
json={"logical_date": now})
assert response.status_code == 404
assert response.json()["detail"] == "DAG with dag_id: 'randoms' not
found"
def test_response_409(self, test_client):
- response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns",
json={"dag_run_id": DAG1_RUN1_ID})
+ now = timezone.utcnow().isoformat()
+ response = test_client.post(
+ f"/public/dags/{DAG1_ID}/dagRuns", json={"dag_run_id":
DAG1_RUN1_ID, "logical_date": now}
+ )
assert response.status_code == 409
response_json = response.json()
assert "detail" in response_json
assert list(response_json["detail"].keys()) == ["reason", "statement",
"orig_error"]
+
+ def test_should_respond_200_with_null_logical_date(self, test_client):
+ response = test_client.post(
+ f"/public/dags/{DAG1_ID}/dagRuns",
+ json={"logical_date": None},
+ )
+ assert response.status_code == 200
+ assert response.json() == {
+ "dag_run_id": mock.ANY,
+ "dag_id": DAG1_ID,
+ "logical_date": None,
+ "queued_at": mock.ANY,
+ "start_date": None,
+ "end_date": None,
+ "data_interval_start": mock.ANY,
+ "data_interval_end": mock.ANY,
+ "last_scheduling_decision": None,
+ "run_type": "manual",
+ "state": "queued",
+ "external_trigger": True,
+ "triggered_by": "rest_api",
+ "conf": {},
+ "note": None,
+ }
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index b7c2f2282c8..f4324abf485 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -356,10 +356,11 @@ class TestDagRun:
run_type=DagRunType.SCHEDULED,
start_date=DEFAULT_DATE,
)
- dr2 = dag_maker.create_dagrun_after(
- dr,
+ next_date = DEFAULT_DATE + datetime.timedelta(days=1)
+ dr2 = dag_maker.create_dagrun(
run_id="test_dagrun_no_deadlock_2",
start_date=DEFAULT_DATE + datetime.timedelta(days=1),
+ logical_date=next_date,
)
ti1_op1 = dr.get_task_instance(task_id="dop")
dr2.get_task_instance(task_id="dop")