This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new aa6f76c46de AIP-83 Logical date should be required field when 
triggering run via API (#46390)
aa6f76c46de is described below

commit aa6f76c46de2cefd2d61991e0febf4f4b29cb885
Author: vatsrahul1001 <[email protected]>
AuthorDate: Mon Feb 10 22:11:13 2025 +0530

    AIP-83 Logical date should be required field when triggering run via API 
(#46390)
    
    * restore unique constraint on logical date and make it nullable
    
    * restore unique constraint on logical date and make it nullable
    
    * fix migration file
    
    * fix migration file
    
    * refactor backfill reprocess logic
    
    * fixing tests
    
    * fix tests
    
    * remove default date from logical date in dag run model
    
    * fix task_filter
    
    * fix failing tests
    
    * make logical date as required field
    
    * refactor
    
    * remove backfill related changes
    
    * implement review comments
    
    * add time now to var
    
    * fix tests
    
    * fix review comments
    
    * Update tests/api_fastapi/core_api/routes/public/test_dag_run.py
    
    Co-authored-by: Wei Lee <[email protected]>
    
    ---------
    
    Co-authored-by: Wei Lee <[email protected]>
---
 airflow/api_fastapi/core_api/datamodels/dag_run.py | 18 +++---
 .../api_fastapi/core_api/openapi/v1-generated.yaml |  8 +++
 .../api_fastapi/core_api/routes/public/dag_run.py  | 20 +++----
 airflow/ui/openapi-gen/requests/schemas.gen.ts     | 13 +++++
 airflow/ui/openapi-gen/requests/types.gen.ts       |  1 +
 airflow/ui/src/queries/useTrigger.ts               |  1 +
 .../core_api/routes/public/test_dag_run.py         | 66 ++++++++++++++++------
 tests/models/test_dagrun.py                        |  5 +-
 8 files changed, 91 insertions(+), 41 deletions(-)

diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py 
b/airflow/api_fastapi/core_api/datamodels/dag_run.py
index 78e0254f622..18be129a195 100644
--- a/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -20,11 +20,11 @@ from __future__ import annotations
 from datetime import datetime
 from enum import Enum
 
-from pydantic import AwareDatetime, Field, NonNegativeInt, computed_field, 
model_validator
+import pendulum
+from pydantic import AwareDatetime, Field, NonNegativeInt, model_validator
 
 from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
 from airflow.models import DagRun
-from airflow.utils import timezone
 from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
@@ -82,9 +82,9 @@ class TriggerDAGRunPostBody(StrictBaseModel):
     """Trigger DAG Run Serializer for POST body."""
 
     dag_run_id: str | None = None
+    logical_date: AwareDatetime | None
     data_interval_start: AwareDatetime | None = None
     data_interval_end: AwareDatetime | None = None
-
     conf: dict = Field(default_factory=dict)
     note: str | None = None
 
@@ -96,18 +96,16 @@ class TriggerDAGRunPostBody(StrictBaseModel):
             )
         return values
 
+    ## when logical date is null, the run id should be generated from 
run_after + random string.
+    # TODO: AIP83: we need to modify this validator after 
https://github.com/apache/airflow/pull/46398 is merged
     @model_validator(mode="after")
     def validate_dag_run_id(self):
         if not self.dag_run_id:
-            self.dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL, 
self.logical_date)
+            self.dag_run_id = DagRun.generate_run_id(
+                DagRunType.MANUAL, self.logical_date or pendulum.now("UTC")
+            )
         return self
 
-    # Mypy issue https://github.com/python/mypy/issues/1362
-    @computed_field  # type: ignore[misc]
-    @property
-    def logical_date(self) -> datetime:
-        return timezone.utcnow()
-
 
 class DAGRunsBatchBody(StrictBaseModel):
     """List DAG Runs body for batch endpoint."""
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 486c371682f..6d2223e6c25 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -10501,6 +10501,12 @@ components:
           - type: string
           - type: 'null'
           title: Dag Run Id
+        logical_date:
+          anyOf:
+          - type: string
+            format: date-time
+          - type: 'null'
+          title: Logical Date
         data_interval_start:
           anyOf:
           - type: string
@@ -10523,6 +10529,8 @@ components:
           title: Note
       additionalProperties: false
       type: object
+      required:
+      - logical_date
       title: TriggerDAGRunPostBody
       description: Trigger DAG Run Serializer for POST body.
     TriggerResponse:
diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py 
b/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 95df203a44a..066d7bc06f3 100644
--- a/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -66,6 +66,7 @@ from airflow.listeners.listener import get_listener_manager
 from airflow.models import DAG, DagModel, DagRun
 from airflow.models.dag_version import DagVersion
 from airflow.timetables.base import DataInterval
+from airflow.utils import timezone
 from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
@@ -345,6 +346,7 @@ def trigger_dag_run(
 ) -> DAGRunResponse:
     """Trigger a DAG."""
     dm = session.scalar(select(DagModel).where(DagModel.is_active, 
DagModel.dag_id == dag_id).limit(1))
+    now = pendulum.now("UTC")
     if not dm:
         raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with dag_id: 
'{dag_id}' not found")
 
@@ -354,7 +356,8 @@ def trigger_dag_run(
             f"DAG with dag_id: '{dag_id}' has import errors and cannot be 
triggered",
         )
 
-    logical_date = pendulum.instance(body.logical_date)
+    logical_date = timezone.coerce_datetime(body.logical_date)
+    coerced_logical_date = timezone.coerce_datetime(logical_date)
 
     try:
         dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
@@ -365,20 +368,11 @@ def trigger_dag_run(
                 end=pendulum.instance(body.data_interval_end),
             )
         else:
-            data_interval = 
dag.timetable.infer_manual_data_interval(run_after=logical_date)
-
-        if body.dag_run_id:
-            run_id = body.dag_run_id
-        else:
-            run_id = dag.timetable.generate_run_id(
-                run_type=DagRunType.MANUAL,
-                logical_date=logical_date,
-                data_interval=data_interval,
-            )
+            data_interval = 
dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date or now)
 
         dag_run = dag.create_dagrun(
-            run_id=run_id,
-            logical_date=logical_date,
+            run_id=cast(str, body.dag_run_id),
+            logical_date=coerced_logical_date,
             data_interval=data_interval,
             run_after=data_interval.end,
             conf=body.conf,
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 1d688c3442d..3e69736e87d 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -5816,6 +5816,18 @@ export const $TriggerDAGRunPostBody = {
       ],
       title: "Dag Run Id",
     },
+    logical_date: {
+      anyOf: [
+        {
+          type: "string",
+          format: "date-time",
+        },
+        {
+          type: "null",
+        },
+      ],
+      title: "Logical Date",
+    },
     data_interval_start: {
       anyOf: [
         {
@@ -5858,6 +5870,7 @@ export const $TriggerDAGRunPostBody = {
   },
   additionalProperties: false,
   type: "object",
+  required: ["logical_date"],
   title: "TriggerDAGRunPostBody",
   description: "Trigger DAG Run Serializer for POST body.",
 } as const;
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index a63ec17ffad..9a2ed62122a 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1416,6 +1416,7 @@ export type TimeDelta = {
  */
 export type TriggerDAGRunPostBody = {
   dag_run_id?: string | null;
+  logical_date: string | null;
   data_interval_start?: string | null;
   data_interval_end?: string | null;
   conf?: {
diff --git a/airflow/ui/src/queries/useTrigger.ts 
b/airflow/ui/src/queries/useTrigger.ts
index 2c56bda6695..3b55be037c6 100644
--- a/airflow/ui/src/queries/useTrigger.ts
+++ b/airflow/ui/src/queries/useTrigger.ts
@@ -108,6 +108,7 @@ export const useTrigger = ({ dagId, onSuccessConfirm }: { 
dagId: string; onSucce
         dag_run_id: checkDagRunId,
         data_interval_end: formattedDataIntervalEnd,
         data_interval_start: formattedDataIntervalStart,
+        logical_date: null,
         note: checkNote,
       },
     });
diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py 
b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
index b24fd993415..3723232a7cd 100644
--- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -1139,12 +1139,7 @@ class TestTriggerDagRun:
         "dag_run_id, note, data_interval_start, data_interval_end",
         [
             ("dag_run_5", "test-note", None, None),
-            (
-                "dag_run_6",
-                "test-note",
-                "2024-01-03T00:00:00+00:00",
-                "2024-01-04T05:00:00+00:00",
-            ),
+            ("dag_run_6", "test-note", "2024-01-03T00:00:00+00:00", 
"2024-01-04T05:00:00+00:00"),
             (None, None, None, None),
         ],
     )
@@ -1153,7 +1148,7 @@ class TestTriggerDagRun:
     ):
         fixed_now = timezone.utcnow().isoformat()
 
-        request_json = {"note": note}
+        request_json = {"note": note, "logical_date": fixed_now}
         if dag_run_id is not None:
             request_json["dag_run_id"] = dag_run_id
         if data_interval_start is not None:
@@ -1297,29 +1292,34 @@ class TestTriggerDagRun:
         ],
     )
     def test_invalid_data(self, test_client, post_body, expected_detail):
+        now = timezone.utcnow().isoformat()
+        post_body["logical_date"] = now
         response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", 
json=post_body)
         assert response.status_code == 422
         assert response.json() == expected_detail
 
     @mock.patch("airflow.models.DAG.create_dagrun")
     def test_dagrun_creation_exception_is_handled(self, mock_create_dagrun, 
test_client):
+        now = timezone.utcnow().isoformat()
         error_message = "Encountered Error"
 
         mock_create_dagrun.side_effect = ValueError(error_message)
 
-        response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", json={})
+        response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", 
json={"logical_date": now})
         assert response.status_code == 400
         assert response.json() == {"detail": error_message}
 
     def test_should_respond_404_if_a_dag_is_inactive(self, test_client, 
session):
+        now = timezone.utcnow().isoformat()
         self._dags_for_trigger_tests(session)
-        response = test_client.post("/public/dags/inactive/dagRuns", json={})
+        response = test_client.post("/public/dags/inactive/dagRuns", 
json={"logical_date": now})
         assert response.status_code == 404
         assert response.json()["detail"] == "DAG with dag_id: 'inactive' not 
found"
 
     def test_should_respond_400_if_a_dag_has_import_errors(self, test_client, 
session):
+        now = timezone.utcnow().isoformat()
         self._dags_for_trigger_tests(session)
-        response = test_client.post("/public/dags/import_errors/dagRuns", 
json={})
+        response = test_client.post("/public/dags/import_errors/dagRuns", 
json={"logical_date": now})
         assert response.status_code == 400
         assert (
             response.json()["detail"]
@@ -1334,11 +1334,11 @@ class TestTriggerDagRun:
         note = "duplicate logical date test"
         response_1 = test_client.post(
             f"/public/dags/{DAG1_ID}/dagRuns",
-            json={"dag_run_id": RUN_ID_1, "note": note},
+            json={"dag_run_id": RUN_ID_1, "note": note, "logical_date": now},
         )
         response_2 = test_client.post(
             f"/public/dags/{DAG1_ID}/dagRuns",
-            json={"dag_run_id": RUN_ID_2, "note": note},
+            json={"dag_run_id": RUN_ID_2, "note": note, "logical_date": now},
         )
 
         assert response_1.status_code == 200
@@ -1378,9 +1378,14 @@ class TestTriggerDagRun:
     def test_should_response_422_for_missing_start_date_or_end_date(
         self, test_client, data_interval_start, data_interval_end
     ):
+        now = timezone.utcnow().isoformat()
         response = test_client.post(
             f"/public/dags/{DAG1_ID}/dagRuns",
-            json={"data_interval_start": data_interval_start, 
"data_interval_end": data_interval_end},
+            json={
+                "data_interval_start": data_interval_start,
+                "data_interval_end": data_interval_end,
+                "logical_date": now,
+            },
         )
         assert response.status_code == 422
         assert (
@@ -1389,21 +1394,50 @@ class TestTriggerDagRun:
         )
 
     def test_raises_validation_error_for_invalid_params(self, test_client):
+        now = timezone.utcnow().isoformat()
         response = test_client.post(
             f"/public/dags/{DAG2_ID}/dagRuns",
-            json={"conf": {"validated_number": 5000}},
+            json={"conf": {"validated_number": 5000}, "logical_date": now},
         )
         assert response.status_code == 400
         assert "Invalid input for param validated_number" in 
response.json()["detail"]
 
     def test_response_404(self, test_client):
-        response = test_client.post("/public/dags/randoms/dagRuns", json={})
+        now = timezone.utcnow().isoformat()
+        response = test_client.post("/public/dags/randoms/dagRuns", 
json={"logical_date": now})
         assert response.status_code == 404
         assert response.json()["detail"] == "DAG with dag_id: 'randoms' not 
found"
 
     def test_response_409(self, test_client):
-        response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", 
json={"dag_run_id": DAG1_RUN1_ID})
+        now = timezone.utcnow().isoformat()
+        response = test_client.post(
+            f"/public/dags/{DAG1_ID}/dagRuns", json={"dag_run_id": 
DAG1_RUN1_ID, "logical_date": now}
+        )
         assert response.status_code == 409
         response_json = response.json()
         assert "detail" in response_json
         assert list(response_json["detail"].keys()) == ["reason", "statement", 
"orig_error"]
+
+    def test_should_respond_200_with_null_logical_date(self, test_client):
+        response = test_client.post(
+            f"/public/dags/{DAG1_ID}/dagRuns",
+            json={"logical_date": None},
+        )
+        assert response.status_code == 200
+        assert response.json() == {
+            "dag_run_id": mock.ANY,
+            "dag_id": DAG1_ID,
+            "logical_date": None,
+            "queued_at": mock.ANY,
+            "start_date": None,
+            "end_date": None,
+            "data_interval_start": mock.ANY,
+            "data_interval_end": mock.ANY,
+            "last_scheduling_decision": None,
+            "run_type": "manual",
+            "state": "queued",
+            "external_trigger": True,
+            "triggered_by": "rest_api",
+            "conf": {},
+            "note": None,
+        }
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index b7c2f2282c8..f4324abf485 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -356,10 +356,11 @@ class TestDagRun:
             run_type=DagRunType.SCHEDULED,
             start_date=DEFAULT_DATE,
         )
-        dr2 = dag_maker.create_dagrun_after(
-            dr,
+        next_date = DEFAULT_DATE + datetime.timedelta(days=1)
+        dr2 = dag_maker.create_dagrun(
             run_id="test_dagrun_no_deadlock_2",
             start_date=DEFAULT_DATE + datetime.timedelta(days=1),
+            logical_date=next_date,
         )
         ti1_op1 = dr.get_task_instance(task_id="dop")
         dr2.get_task_instance(task_id="dop")

Reply via email to