This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 714a9a79a5 Add data_interval_start and data_interval_end in dagrun
create API endpoint (#36630)
714a9a79a5 is described below
commit 714a9a79a59c317f58d6bd621acba9dd4e2a4622
Author: Burak Karakan <[email protected]>
AuthorDate: Fri Jan 26 19:27:55 2024 +0000
Add data_interval_start and data_interval_end in dagrun create API endpoint
(#36630)
---
.../api_connexion/endpoints/dag_run_endpoint.py | 14 +++-
airflow/api_connexion/openapi/v1.yaml | 6 +-
airflow/api_connexion/schemas/dag_run_schema.py | 16 +++-
airflow/www/static/js/types/api-generated.ts | 10 ++-
.../endpoints/test_dag_run_endpoint.py | 89 ++++++++++++++++++++--
5 files changed, 120 insertions(+), 15 deletions(-)
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 149af771e4..3826acb75f 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -61,6 +61,7 @@ from airflow.api_connexion.schemas.task_instance_schema
import (
from airflow.auth.managers.models.resource_details import DagAccessEntity
from airflow.models import DagModel, DagRun
from airflow.security import permissions
+from airflow.timetables.base import DataInterval
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.db import get_query_count
from airflow.utils.log.action_logger import action_event_from_permission
@@ -336,11 +337,22 @@ def post_dag_run(*, dag_id: str, session: Session =
NEW_SESSION) -> APIResponse:
if not dagrun_instance:
try:
dag = get_airflow_app().dag_bag.get_dag(dag_id)
+
+ data_interval_start = post_body.get("data_interval_start")
+ data_interval_end = post_body.get("data_interval_end")
+ if data_interval_start and data_interval_end:
+ data_interval = DataInterval(
+ start=pendulum.instance(data_interval_start),
+ end=pendulum.instance(data_interval_end),
+ )
+ else:
+ data_interval =
dag.timetable.infer_manual_data_interval(run_after=logical_date)
+
dag_run = dag.create_dagrun(
run_type=DagRunType.MANUAL,
run_id=run_id,
execution_date=logical_date,
-
data_interval=dag.timetable.infer_manual_data_interval(run_after=logical_date),
+ data_interval=data_interval,
state=DagRunState.QUEUED,
conf=post_body.get("conf"),
external_trigger=True,
diff --git a/airflow/api_connexion/openapi/v1.yaml
b/airflow/api_connexion/openapi/v1.yaml
index 03dabfb759..a94f303a5a 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -2900,12 +2900,14 @@ components:
data_interval_start:
type: string
format: date-time
- readOnly: true
+ description: |
+ The beginning of the interval the DAG run covers.
nullable: true
data_interval_end:
type: string
format: date-time
- readOnly: true
+ description: |
+ The end of the interval the DAG run covers.
nullable: true
last_scheduling_decision:
type: string
diff --git a/airflow/api_connexion/schemas/dag_run_schema.py
b/airflow/api_connexion/schemas/dag_run_schema.py
index da01751f59..6a1fc334b5 100644
--- a/airflow/api_connexion/schemas/dag_run_schema.py
+++ b/airflow/api_connexion/schemas/dag_run_schema.py
@@ -20,7 +20,7 @@ from __future__ import annotations
import json
from typing import NamedTuple
-from marshmallow import fields, post_dump, pre_load, validate
+from marshmallow import ValidationError, fields, post_dump, pre_load,
validate, validates_schema
from marshmallow.schema import Schema
from marshmallow.validate import Range
from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field
@@ -69,8 +69,8 @@ class DAGRunSchema(SQLAlchemySchema):
state = DagStateField(dump_only=True)
external_trigger = auto_field(dump_default=True, dump_only=True)
conf = ConfObject()
- data_interval_start = auto_field(dump_only=True)
- data_interval_end = auto_field(dump_only=True)
+ data_interval_start = auto_field(validate=validate_istimezone)
+ data_interval_end = auto_field(validate=validate_istimezone)
last_scheduling_decision = auto_field(dump_only=True)
run_type = auto_field(dump_only=True)
note = auto_field(dump_only=False)
@@ -121,6 +121,16 @@ class DAGRunSchema(SQLAlchemySchema):
return ret_data
+ @validates_schema
+ def validate_data_interval_dates(self, data, **kwargs):
+ data_interval_start_exists = data.get("data_interval_start") is not
None
+ data_interval_end_exists = data.get("data_interval_end") is not None
+
+ if data_interval_start_exists != data_interval_end_exists:
+ raise ValidationError(
+ "Both 'data_interval_start' and 'data_interval_end' must be
specified together"
+ )
+
class SetDagRunStateFormSchema(Schema):
"""Schema for handling the request of setting state of DAG run."""
diff --git a/airflow/www/static/js/types/api-generated.ts
b/airflow/www/static/js/types/api-generated.ts
index bd205c8e69..d9e1816bbd 100644
--- a/airflow/www/static/js/types/api-generated.ts
+++ b/airflow/www/static/js/types/api-generated.ts
@@ -1077,9 +1077,15 @@ export interface components {
start_date?: string | null;
/** Format: date-time */
end_date?: string | null;
- /** Format: date-time */
+ /**
+ * Format: date-time
+ * @description The beginning of the interval the DAG run covers.
+ */
data_interval_start?: string | null;
- /** Format: date-time */
+ /**
+ * Format: date-time
+ * @description The end of the interval the DAG run covers.
+ */
data_interval_end?: string | null;
/** Format: date-time */
last_scheduling_decision?: string | null;
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 0ce3f222db..045b5392f5 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -1159,25 +1159,51 @@ class TestGetDagRunBatchDateFilters(TestDagRunEndpoint):
class TestPostDagRun(TestDagRunEndpoint):
@pytest.mark.parametrize("logical_date_field_name", ["execution_date",
"logical_date"])
@pytest.mark.parametrize(
- "dag_run_id, logical_date, note",
+ "dag_run_id, logical_date, note, data_interval_start,
data_interval_end",
[
- pytest.param("TEST_DAG_RUN", "2020-06-11T18:00:00+00:00",
"test-note", id="all-present"),
- pytest.param(None, "2020-06-11T18:00:00+00:00", None,
id="only-date"),
- pytest.param(None, None, None, id="all-missing"),
+ pytest.param(
+ "TEST_DAG_RUN", "2020-06-11T18:00:00+00:00", "test-note",
None, None, id="all-present"
+ ),
+ pytest.param(
+ "TEST_DAG_RUN",
+ "2024-06-11T18:00:00+00:00",
+ "test-note",
+ "2024-01-03T00:00:00+00:00",
+ "2024-01-04T05:00:00+00:00",
+ id="all-present-with-dates",
+ ),
+ pytest.param(None, "2020-06-11T18:00:00+00:00", None, None, None,
id="only-date"),
+ pytest.param(None, None, None, None, None, id="all-missing"),
],
)
- def test_should_respond_200(self, session, logical_date_field_name,
dag_run_id, logical_date, note):
+ def test_should_respond_200(
+ self,
+ session,
+ logical_date_field_name,
+ dag_run_id,
+ logical_date,
+ note,
+ data_interval_start,
+ data_interval_end,
+ ):
self._create_dag("TEST_DAG_ID")
# We'll patch airflow.utils.timezone.utcnow to always return this so we
# can check the returned dates.
fixed_now = timezone.utcnow()
+ # raise NotImplementedError("TODO: Add tests for data_interval_start
and data_interval_end")
+
request_json = {}
if logical_date is not None:
request_json[logical_date_field_name] = logical_date
if dag_run_id is not None:
request_json["dag_run_id"] = dag_run_id
+ if data_interval_start is not None:
+ request_json["data_interval_start"] = data_interval_start
+ if data_interval_end is not None:
+ request_json["data_interval_end"] = data_interval_end
+
request_json["note"] = note
with mock.patch("airflow.utils.timezone.utcnow", lambda: fixed_now):
response = self.client.post(
@@ -1185,6 +1211,7 @@ class TestPostDagRun(TestDagRunEndpoint):
json=request_json,
environ_overrides={"REMOTE_USER": "test"},
)
+
assert response.status_code == 200
if logical_date is None:
@@ -1195,6 +1222,13 @@ class TestPostDagRun(TestDagRunEndpoint):
expected_dag_run_id = f"manual__{expected_logical_date}"
else:
expected_dag_run_id = dag_run_id
+
+ expected_data_interval_start = expected_logical_date
+ expected_data_interval_end = expected_logical_date
+ if data_interval_start is not None and data_interval_end is not None:
+ expected_data_interval_start = data_interval_start
+ expected_data_interval_end = data_interval_end
+
assert response.json == {
"conf": {},
"dag_id": "TEST_DAG_ID",
@@ -1205,8 +1239,8 @@ class TestPostDagRun(TestDagRunEndpoint):
"external_trigger": True,
"start_date": None,
"state": "queued",
- "data_interval_end": expected_logical_date,
- "data_interval_start": expected_logical_date,
+ "data_interval_end": expected_data_interval_end,
+ "data_interval_start": expected_data_interval_start,
"last_scheduling_decision": None,
"run_type": "manual",
"note": note,
@@ -1323,6 +1357,47 @@ class TestPostDagRun(TestDagRunEndpoint):
assert response.json["title"] == "logical_date conflicts with
execution_date"
assert response.json["detail"] == (f"'{logical_date}' !=
'{execution_date}'")
+ @pytest.mark.parametrize(
+ "data_interval_start, data_interval_end, expected",
+ [
+ (
+ "2020-11-10T08:25:56.939143",
+ None,
+ "'2020-11-10T08:25:56.939143' is not a 'date-time' -
'data_interval_start'",
+ ),
+ (
+ None,
+ "2020-11-10T08:25:56.939143",
+ "'2020-11-10T08:25:56.939143' is not a 'date-time' -
'data_interval_end'",
+ ),
+ (
+ "2020-11-10T08:25:56.939143+00:00",
+ None,
+ "{'_schema': [\"Both 'data_interval_start' and
'data_interval_end' must be specified together\"]}",
+ ),
+ (
+ None,
+ "2020-11-10T08:25:56.939143+00:00",
+ "{'_schema': [\"Both 'data_interval_start' and
'data_interval_end' must be specified together\"]}",
+ ),
+ ],
+ )
+ def test_should_response_400_for_missing_start_date_or_end_date(
+ self, data_interval_start, data_interval_end, expected
+ ):
+ self._create_dag("TEST_DAG_ID")
+ response = self.client.post(
+ "api/v1/dags/TEST_DAG_ID/dagRuns",
+ json={
+ "execution_date": "2020-11-10T08:25:56.939143+00:00",
+ "data_interval_start": data_interval_start,
+ "data_interval_end": data_interval_end,
+ },
+ environ_overrides={"REMOTE_USER": "test"},
+ )
+ assert response.status_code == 400
+ assert response.json["detail"] == expected
+
@pytest.mark.parametrize(
"data, expected",
[