This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6178491a11 Add fields to dagrun endpoint (#23440)
6178491a11 is described below
commit 6178491a117924155963586b246d2bf54be5320f
Author: Karthikeyan Singaravelan <[email protected]>
AuthorDate: Tue May 3 17:57:14 2022 +0530
Add fields to dagrun endpoint (#23440)
* Add below fields to dagrun endpoint :
* data_interval_start
* data_interval_end
* last_scheduling_decision
* run_type
* Refactor hardcoded dates with constants.
---
airflow/api_connexion/openapi/v1.yaml | 22 ++++++
airflow/api_connexion/schemas/dag_run_schema.py | 4 +
.../endpoints/test_dag_run_endpoint.py | 86 ++++++++++++++++++----
tests/api_connexion/schemas/test_dag_run_schema.py | 12 +++
4 files changed, 110 insertions(+), 14 deletions(-)
diff --git a/airflow/api_connexion/openapi/v1.yaml
b/airflow/api_connexion/openapi/v1.yaml
index d19c11aeba..9e99db032d 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -2392,6 +2392,28 @@ components:
format: date-time
readOnly: true
nullable: true
+ data_interval_start:
+ type: string
+ format: date-time
+ readOnly: true
+ nullable: true
+ data_interval_end:
+ type: string
+ format: date-time
+ readOnly: true
+ nullable: true
+ last_scheduling_decision:
+ type: string
+ format: date-time
+ readOnly: true
+ nullable: true
+ run_type:
+ type: string
+ readOnly: true
+ enum:
+ - backfill
+ - manual
+ - scheduled
state:
$ref: '#/components/schemas/DagState'
readOnly: true
diff --git a/airflow/api_connexion/schemas/dag_run_schema.py
b/airflow/api_connexion/schemas/dag_run_schema.py
index 4bd91dbe37..44f6eda496 100644
--- a/airflow/api_connexion/schemas/dag_run_schema.py
+++ b/airflow/api_connexion/schemas/dag_run_schema.py
@@ -67,6 +67,10 @@ class DAGRunSchema(SQLAlchemySchema):
state = DagStateField(dump_only=True)
external_trigger = auto_field(dump_default=True, dump_only=True)
conf = ConfObject()
+ data_interval_start = auto_field(dump_only=True)
+ data_interval_end = auto_field(dump_only=True)
+ last_scheduling_decision = auto_field(dump_only=True)
+ run_type = auto_field(dump_only=True)
@pre_load
def autogenerate(self, data, **kwargs):
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 3ffb231bda..d2547587dd 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -232,6 +232,10 @@ class TestGetDagRun(TestDagRunEndpoint):
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
+ 'data_interval_end': None,
+ 'data_interval_start': None,
+ 'last_scheduling_decision': None,
+ 'run_type': 'manual',
}
assert response.json == expected_response
@@ -286,6 +290,10 @@ class TestGetDagRuns(TestDagRunEndpoint):
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
+ 'data_interval_end': None,
+ 'data_interval_start': None,
+ 'last_scheduling_decision': None,
+ 'run_type': 'manual',
},
{
'dag_id': 'TEST_DAG_ID',
@@ -297,6 +305,10 @@ class TestGetDagRuns(TestDagRunEndpoint):
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
+ 'data_interval_end': None,
+ 'data_interval_start': None,
+ 'last_scheduling_decision': None,
+ 'run_type': 'manual',
},
],
"total_entries": 2,
@@ -348,6 +360,10 @@ class TestGetDagRuns(TestDagRunEndpoint):
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
+ 'data_interval_end': None,
+ 'data_interval_start': None,
+ 'last_scheduling_decision': None,
+ 'run_type': 'manual',
},
{
'dag_id': 'TEST_DAG_ID',
@@ -359,6 +375,10 @@ class TestGetDagRuns(TestDagRunEndpoint):
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
+ 'data_interval_end': None,
+ 'data_interval_start': None,
+ 'last_scheduling_decision': None,
+ 'run_type': 'manual',
},
],
"total_entries": 2,
@@ -603,6 +623,10 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
+ 'data_interval_end': None,
+ 'data_interval_start': None,
+ 'last_scheduling_decision': None,
+ 'run_type': 'manual',
},
{
'dag_id': 'TEST_DAG_ID',
@@ -614,6 +638,10 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
+ 'data_interval_end': None,
+ 'data_interval_start': None,
+ 'last_scheduling_decision': None,
+ 'run_type': 'manual',
},
],
"total_entries": 2,
@@ -652,6 +680,10 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
+ 'data_interval_end': None,
+ 'data_interval_start': None,
+ 'last_scheduling_decision': None,
+ 'run_type': 'manual',
},
{
'dag_id': 'TEST_DAG_ID',
@@ -663,6 +695,10 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
+ 'data_interval_end': None,
+ 'data_interval_start': None,
+ 'last_scheduling_decision': None,
+ 'run_type': 'manual',
},
],
"total_entries": 2,
@@ -699,6 +735,10 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
+ 'data_interval_end': None,
+ 'data_interval_start': None,
+ 'last_scheduling_decision': None,
+ 'run_type': 'manual',
},
{
'dag_id': 'TEST_DAG_ID',
@@ -710,6 +750,10 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
+ 'data_interval_end': None,
+ 'data_interval_start': None,
+ 'last_scheduling_decision': None,
+ 'run_type': 'manual',
},
],
"total_entries": 2,
@@ -1005,6 +1049,10 @@ class TestPostDagRun(TestDagRunEndpoint):
"external_trigger": True,
"start_date": None,
"state": "queued",
+ "data_interval_end": expected_logical_date,
+ "data_interval_start": expected_logical_date,
+ "last_scheduling_decision": None,
+ "run_type": "manual",
} == response.json
def test_should_respond_400_if_a_dag_has_import_errors(self, session):
@@ -1026,43 +1074,48 @@ class TestPostDagRun(TestDagRunEndpoint):
} == response.json
def
test_should_response_200_for_matching_execution_date_logical_date(self):
+ execution_date = "2020-11-10T08:25:56.939143+00:00"
+ logical_date = "2020-11-10T08:25:56.939143+00:00"
self._create_dag("TEST_DAG_ID")
response = self.client.post(
"api/v1/dags/TEST_DAG_ID/dagRuns",
json={
- "execution_date": "2020-11-10T08:25:56.939143+00:00",
- "logical_date": "2020-11-10T08:25:56.939143+00:00",
+ "execution_date": execution_date,
+ "logical_date": logical_date,
},
environ_overrides={"REMOTE_USER": "test"},
)
+ dag_run_id = f"manual__{logical_date}"
+
assert response.status_code == 200
assert {
"conf": {},
"dag_id": "TEST_DAG_ID",
- "dag_run_id": "manual__2020-11-10T08:25:56.939143+00:00",
+ "dag_run_id": dag_run_id,
"end_date": None,
- "execution_date": "2020-11-10T08:25:56.939143+00:00",
- "logical_date": "2020-11-10T08:25:56.939143+00:00",
+ "execution_date": execution_date,
+ "logical_date": logical_date,
"external_trigger": True,
"start_date": None,
"state": "queued",
+ "data_interval_end": logical_date,
+ "data_interval_start": logical_date,
+ "last_scheduling_decision": None,
+ "run_type": "manual",
} == response.json
def
test_should_response_400_for_conflicting_execution_date_logical_date(self):
+ execution_date = "2020-11-10T08:25:56.939143+00:00"
+ logical_date = "2020-11-11T08:25:56.939143+00:00"
self._create_dag("TEST_DAG_ID")
response = self.client.post(
"api/v1/dags/TEST_DAG_ID/dagRuns",
- json={
- "execution_date": "2020-11-10T08:25:56.939143+00:00",
- "logical_date": "2020-11-11T08:25:56.939143+00:00",
- },
+ json={"execution_date": execution_date, "logical_date":
logical_date},
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 400
assert response.json["title"] == "logical_date conflicts with
execution_date"
- assert response.json["detail"] == (
- "'2020-11-11T08:25:56.939143+00:00' !=
'2020-11-10T08:25:56.939143+00:00'"
- )
+ assert response.json["detail"] == (f"'{logical_date}' !=
'{execution_date}'")
@parameterized.expand(
[
@@ -1219,13 +1272,14 @@ class TestPostDagRun(TestDagRunEndpoint):
class TestPatchDagRunState(TestDagRunEndpoint):
@pytest.mark.parametrize("state", ["failed", "success"])
- def test_should_respond_200(self, state, dag_maker, session):
+ @pytest.mark.parametrize("run_type", [state.value for state in DagRunType])
+ def test_should_respond_200(self, state, run_type, dag_maker, session):
dag_id = "TEST_DAG_ID"
dag_run_id = 'TEST_DAG_RUN_ID'
with dag_maker(dag_id) as dag:
task = EmptyOperator(task_id='task_id', dag=dag)
self.app.dag_bag.bag_dag(dag, root_dag=dag)
- dr = dag_maker.create_dagrun(run_id=dag_run_id)
+ dr = dag_maker.create_dagrun(run_id=dag_run_id, run_type=run_type)
ti = dr.get_task_instance(task_id='task_id')
ti.task = task
ti.state = State.RUNNING
@@ -1254,6 +1308,10 @@ class TestPatchDagRunState(TestDagRunEndpoint):
'logical_date': dr.execution_date.isoformat(),
'start_date': dr.start_date.isoformat(),
'state': state,
+ 'data_interval_start': dr.data_interval_start.isoformat(),
+ 'data_interval_end': dr.data_interval_end.isoformat(),
+ 'last_scheduling_decision': None,
+ 'run_type': run_type,
}
@pytest.mark.parametrize('invalid_state', ["running", "queued"])
diff --git a/tests/api_connexion/schemas/test_dag_run_schema.py
b/tests/api_connexion/schemas/test_dag_run_schema.py
index 6f42ec0aa3..d97e8c3372 100644
--- a/tests/api_connexion/schemas/test_dag_run_schema.py
+++ b/tests/api_connexion/schemas/test_dag_run_schema.py
@@ -74,6 +74,10 @@ class TestDAGRunSchema(TestDAGRunBase):
"external_trigger": True,
"start_date": self.default_time,
"conf": {"start": "stop"},
+ "data_interval_end": None,
+ "data_interval_start": None,
+ "last_scheduling_decision": None,
+ "run_type": "manual",
}
@parameterized.expand(
@@ -162,6 +166,10 @@ class TestDagRunCollection(TestDAGRunBase):
"state": "running",
"start_date": self.default_time,
"conf": {"start": "stop"},
+ "data_interval_end": None,
+ "data_interval_start": None,
+ "last_scheduling_decision": None,
+ "run_type": "manual",
},
{
"dag_id": "my-dag-run",
@@ -173,6 +181,10 @@ class TestDagRunCollection(TestDAGRunBase):
"external_trigger": True,
"start_date": self.default_time,
"conf": {},
+ "data_interval_end": None,
+ "data_interval_start": None,
+ "last_scheduling_decision": None,
+ "run_type": "manual",
},
],
"total_entries": 2,