mik-laj commented on a change in pull request #9153:
URL: https://github.com/apache/airflow/pull/9153#discussion_r436645575
##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -29,26 +35,471 @@ def setUpClass(cls) -> None:
def setUp(self) -> None:
self.client = self.app.test_client() # type:ignore
+ self.now = timezone.utcnow()
+ clear_db_runs()
+
+ def tearDown(self) -> None:
+ clear_db_runs()
class TestDeleteDagRun(TestDagRunEndpoint):
@pytest.mark.skip(reason="Not implemented yet")
def test_should_response_200(self):
- response =
self.client.delete("/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+ response =
self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
assert response.status_code == 204
class TestGetDagRun(TestDagRunEndpoint):
- @pytest.mark.skip(reason="Not implemented yet")
- def test_should_response_200(self):
- response = self.client.get("/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
+
+ @provide_session
+ def test_should_response_200(self, session):
+ dagrun_model = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now,
+ start_date=self.now,
+ external_trigger=True,
+ )
+ session.add(dagrun_model)
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 1
+ response =
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
assert response.status_code == 200
+ self.assertEqual(
+ response.json,
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': self.now.isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ }
+ )
+
+ def test_should_response_404(self):
+ response = self.client.get("api/v1/dags/invalid-id/dagRuns/invalid-id")
+ assert response.status_code == 404
+ self.assertEqual(
+ {
+ 'detail': None,
+ 'status': 404,
+ 'title': 'DAGRun not found',
+ 'type': 'about:blank'
+ },
+ response.json
+ )
class TestGetDagRuns(TestDagRunEndpoint):
- @pytest.mark.skip(reason="Not implemented yet")
+
+ @provide_session
+ def test_should_response_200(self, session):
+ dagrun_model_1 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_1',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now,
+ start_date=self.now,
+ external_trigger=True,
+ )
+ dagrun_model_2 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_2',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=1),
+ start_date=self.now,
+ external_trigger=True,
+ )
+ session.add_all([dagrun_model_1, dagrun_model_2])
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 2
+ response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns")
+ assert response.status_code == 200
+ self.assertEqual(
+ response.json,
+ {
+ "dag_runs": [
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID_1',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': self.now.isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ },
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID_2',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': (self.now +
timedelta(minutes=1)).isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ }
+ ],
+ "total_entries": 2
+ }
+ )
+
+ @provide_session
+ def test_should_return_all_with_tilde_as_dag_id(self, session):
+ dagrun_model_1 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_1',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now,
+ start_date=self.now,
+ external_trigger=True,
+ )
+ dagrun_model_2 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_2',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=1),
+ start_date=self.now,
+ external_trigger=True,
+ )
+ session.add_all([dagrun_model_1, dagrun_model_2])
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 2
+ response = self.client.get("api/v1/dags/~/dagRuns")
+ assert response.status_code == 200
+ self.assertEqual(
+ response.json,
+ {
+ "dag_runs": [
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID_1',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': self.now.isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ },
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID_2',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': (self.now +
timedelta(minutes=1)).isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ }
+ ],
+ "total_entries": 2
+ }
+ )
+
+ @provide_session
+ def test_handle_limit_in_query(self, session):
+ dagrun_models = [DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID' + str(i),
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=i),
+ start_date=self.now,
+ external_trigger=True,
+ ) for i in range(100)]
+
+ session.add_all(dagrun_models)
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 100
+
+ response = self.client.get(
+ "api/v1/dags/TEST_DAG_ID/dagRuns?limit=1"
+ )
+ assert response.status_code == 200
+ self.assertEqual(response.json.get('total_entries'), 1)
+ self.assertEqual(
+ response.json,
+ {
+ "dag_runs": [
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID0',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': (self.now +
timedelta(minutes=0)).isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ }
+ ],
+ "total_entries": 1
+ }
+ )
+
+ @provide_session
+ def test_handle_offset_in_query(self, session):
+ dagrun_models = [DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID' + str(i),
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=i),
+ start_date=self.now,
+ external_trigger=True,
+ ) for i in range(4)]
+
+ session.add_all(dagrun_models)
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 4
+
+ response = self.client.get(
+ "api/v1/dags/TEST_DAG_ID/dagRuns?offset=1"
+ )
+ assert response.status_code == 200
+ self.assertEqual(response.json.get('total_entries'), 3)
Review comment:
Yes. Separate query sound good.
##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -26,18 +34,100 @@ def delete_dag_run():
raise NotImplementedError("Not implemented yet.")
-def get_dag_run():
+@provide_session
+def get_dag_run(dag_id, dag_run_id, session):
"""
Get a DAG Run.
"""
- raise NotImplementedError("Not implemented yet.")
+ query = session.query(DagRun)
+ query = query.filter(DagRun.dag_id == dag_id)
+ query = query.filter(DagRun.run_id == dag_run_id)
+ dag_run = query.one_or_none()
+ if dag_run is None:
+ raise NotFound("DAGRun not found")
+ return dagrun_schema.dump(dag_run)
-def get_dag_runs():
+@provide_session
+def get_dag_runs(dag_id, session):
"""
Get all DAG Runs.
"""
- raise NotImplementedError("Not implemented yet.")
+ offset = request.args.get(parameters.page_offset, 0)
+ limit = min(int(request.args.get(parameters.page_limit, 100)), 100)
+
+ start_date_gte = parse_datetime_in_query(
+ request.args.get(parameters.filter_start_date_gte, None)
+ )
+ start_date_lte = parse_datetime_in_query(
+ request.args.get(parameters.filter_start_date_lte, None)
+ )
+
+ execution_date_gte = parse_datetime_in_query(
+ request.args.get(parameters.filter_execution_date_gte, None)
+ )
+ execution_date_lte = parse_datetime_in_query(
+ request.args.get(parameters.filter_execution_date_lte, None)
+ )
+
+ end_date_gte = parse_datetime_in_query(
+ request.args.get(parameters.filter_end_date_gte, None)
+ )
+ end_date_lte = parse_datetime_in_query(
+ request.args.get(parameters.filter_end_date_lte, None)
+ )
+
+ query = session.query(DagRun)
+
+ # This endpoint allows specifying ~ as the dag_id to retrieve DAG Runs
for all DAGs.
+ if dag_id == '~':
+ dag_run = query.all()
+ return dagrun_collection_schema.dump(DAGRunCollection(
+ dag_runs=dag_run,
+ total_entries=len(dag_run))
+ )
+
+ query = query.filter(DagRun.dag_id == dag_id)
Review comment:
```suggestion
if dag_id != '~':
query = query.filter(DagRun.dag_id == dag_id)
```
We still want to have filters and paginations. ~ is a wildcard. It allows
any value, so we should give up filtering in this field.
##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -26,18 +34,100 @@ def delete_dag_run():
raise NotImplementedError("Not implemented yet.")
-def get_dag_run():
+@provide_session
+def get_dag_run(dag_id, dag_run_id, session):
"""
Get a DAG Run.
"""
- raise NotImplementedError("Not implemented yet.")
+ query = session.query(DagRun)
+ query = query.filter(DagRun.dag_id == dag_id)
+ query = query.filter(DagRun.run_id == dag_run_id)
+ dag_run = query.one_or_none()
+ if dag_run is None:
+ raise NotFound("DAGRun not found")
+ return dagrun_schema.dump(dag_run)
-def get_dag_runs():
+@provide_session
+def get_dag_runs(dag_id, session):
"""
Get all DAG Runs.
"""
- raise NotImplementedError("Not implemented yet.")
+ offset = request.args.get(parameters.page_offset, 0)
+ limit = min(int(request.args.get(parameters.page_limit, 100)), 100)
+
+ start_date_gte = parse_datetime_in_query(
+ request.args.get(parameters.filter_start_date_gte, None)
+ )
+ start_date_lte = parse_datetime_in_query(
+ request.args.get(parameters.filter_start_date_lte, None)
+ )
+
+ execution_date_gte = parse_datetime_in_query(
+ request.args.get(parameters.filter_execution_date_gte, None)
+ )
+ execution_date_lte = parse_datetime_in_query(
+ request.args.get(parameters.filter_execution_date_lte, None)
+ )
+
+ end_date_gte = parse_datetime_in_query(
+ request.args.get(parameters.filter_end_date_gte, None)
+ )
+ end_date_lte = parse_datetime_in_query(
+ request.args.get(parameters.filter_end_date_lte, None)
+ )
+
+ query = session.query(DagRun)
+
+ # This endpoint allows specifying ~ as the dag_id to retrieve DAG Runs
for all DAGs.
+ if dag_id == '~':
+ dag_run = query.all()
+ return dagrun_collection_schema.dump(DAGRunCollection(
+ dag_runs=dag_run,
+ total_entries=len(dag_run))
+ )
+
+ query = query.filter(DagRun.dag_id == dag_id)
+
+ # filter start date
+ if start_date_gte and start_date_lte:
+ query = query.filter(DagRun.start_date <= start_date_lte,
+ DagRun.start_date >= start_date_gte)
+
+ elif start_date_gte and not start_date_lte:
+ query = query.filter(DagRun.start_date >= start_date_gte)
+
+ elif start_date_lte and not start_date_gte:
+ query = query.filter(DagRun.start_date <= start_date_lte)
Review comment:
```suggestion
if start_date_gte:
query = query.filter(DagRun.start_date >= start_date_gte)
if start_date_lte:
query = query.filter(DagRun.start_date <= start_date_lte)
```
Is this causing any problems?
##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -1264,6 +1264,7 @@ components:
type: string
format: date-time
readOnly: True
+ nullable: true
Review comment:
+1
##########
File path: tests/api_connexion/schemas/test_dag_run_schema.py
##########
@@ -0,0 +1,171 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+from airflow.api_connexion.schemas.dag_run_schema import (
+ DAGRunCollection, dagrun_collection_schema, dagrun_schema,
+)
+from airflow.models import DagRun
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.types import DagRunType
+from tests.test_utils.db import clear_db_runs
+
+
+class TestDAGRunBase(unittest.TestCase):
+
+ def setUp(self) -> None:
+ clear_db_runs()
+ self.now = timezone.utcnow()
+
+ def tearDown(self) -> None:
+ clear_db_runs()
+
+
+class TestDAGRunSchema(TestDAGRunBase):
+
+ @provide_session
+ def test_serialzie(self, session):
+ dagrun_model = DagRun(run_id='my-dag-run',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now,
+ start_date=self.now,
+ conf='{"start": "stop"}'
+ )
+ session.add(dagrun_model)
+ session.commit()
+ dagrun_model = session.query(DagRun).first()
+ deserialized_dagrun = dagrun_schema.dump(dagrun_model)
+
+ self.assertEqual(
+ deserialized_dagrun[0],
+ {
+ 'dag_id': None,
+ 'dag_run_id': 'my-dag-run',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': str(self.now.isoformat()),
+ 'external_trigger': True,
+ 'start_date': str(self.now.isoformat()),
+ 'conf': '{"start": "stop"}'
Review comment:
Why do we have a string here? The specification define object here.
##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -29,28 +35,555 @@ def setUpClass(cls) -> None:
def setUp(self) -> None:
self.client = self.app.test_client() # type:ignore
+ self.now = timezone.utcnow()
+ clear_db_runs()
+
+ def tearDown(self) -> None:
+ clear_db_runs()
class TestDeleteDagRun(TestDagRunEndpoint):
@pytest.mark.skip(reason="Not implemented yet")
def test_should_response_200(self):
- response =
self.client.delete("/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+ response =
self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
assert response.status_code == 204
class TestGetDagRun(TestDagRunEndpoint):
- @pytest.mark.skip(reason="Not implemented yet")
- def test_should_response_200(self):
- response = self.client.get("/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
+
+ @provide_session
+ def test_should_response_200(self, session):
+ dagrun_model = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now,
+ start_date=self.now,
+ external_trigger=True,
+ )
+ session.add(dagrun_model)
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 1
+ response =
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
assert response.status_code == 200
+ self.assertEqual(
+ response.json,
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': self.now.isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ }
+ )
+
+ def test_should_response_404(self):
+ response = self.client.get("api/v1/dags/invalid-id/dagRuns/invalid-id")
+ assert response.status_code == 404
+ self.assertEqual(
+ {
+ 'detail': None,
+ 'status': 404,
+ 'title': 'DAGRun not found',
+ 'type': 'about:blank'
+ },
+ response.json
+ )
class TestGetDagRuns(TestDagRunEndpoint):
- @pytest.mark.skip(reason="Not implemented yet")
- def test_should_response_200(self):
- response = self.client.get("/dags/TEST_DAG_ID/dagRuns/")
+
+ @provide_session
+ def test_should_response_200(self, session):
+ dagrun_model_1 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_1',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now,
+ start_date=self.now,
+ external_trigger=True,
+ )
+ dagrun_model_2 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_2',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=1),
+ start_date=self.now,
+ external_trigger=True,
+ )
+ session.add_all([dagrun_model_1, dagrun_model_2])
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 2
+ response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns")
+ assert response.status_code == 200
+ self.assertEqual(
+ response.json,
+ {
+ "dag_runs": [
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID_1',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': self.now.isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ },
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID_2',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': (self.now +
timedelta(minutes=1)).isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ }
+ ],
+ "total_entries": 2
+ }
+ )
+
+ @provide_session
+ def test_should_return_all_with_tilde_as_dag_id(self, session):
+ dagrun_model_1 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_1',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now,
+ start_date=self.now,
+ external_trigger=True,
+ )
+ dagrun_model_2 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_2',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=1),
+ start_date=self.now,
+ external_trigger=True,
+ )
+ session.add_all([dagrun_model_1, dagrun_model_2])
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 2
+ response = self.client.get("api/v1/dags/~/dagRuns")
+ assert response.status_code == 200
+ self.assertEqual(
+ response.json,
+ {
+ "dag_runs": [
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID_1',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': self.now.isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ },
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID_2',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': (self.now +
timedelta(minutes=1)).isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ }
+ ],
+ "total_entries": 2
+ }
+ )
+
+ @provide_session
+ def test_handle_limit_in_query(self, session):
+ dagrun_models = [DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID' + str(i),
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=i),
+ start_date=self.now,
+ external_trigger=True,
+ ) for i in range(100)]
+
+ session.add_all(dagrun_models)
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 100
+
+ response = self.client.get(
+ "api/v1/dags/TEST_DAG_ID/dagRuns?limit=1"
+ )
+ assert response.status_code == 200
+ self.assertEqual(response.json.get('total_entries'), 100)
+ self.assertEqual(
+ response.json,
+ {
+ "dag_runs": [
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID0',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': (self.now +
timedelta(minutes=0)).isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ }
+ ],
+ "total_entries": 100
+ }
+ )
+
+ @provide_session
+ def test_handle_offset_in_query(self, session):
+ dagrun_models = [DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID' + str(i),
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=i),
+ start_date=self.now,
+ external_trigger=True,
+ ) for i in range(4)]
+
+ session.add_all(dagrun_models)
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 4
+
+ response = self.client.get(
+ "api/v1/dags/TEST_DAG_ID/dagRuns?offset=1"
+ )
+ assert response.status_code == 200
+ self.assertEqual(response.json.get('total_entries'), 4)
+
+ @provide_session
+ def test_handle_limit_and_offset_in_query(self, session):
+ dagrun_models = [DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID' + str(i),
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=i),
+ start_date=self.now,
+ external_trigger=True,
+ ) for i in range(10)]
+
+ session.add_all(dagrun_models)
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 10
+
+ response = self.client.get(
+ "api/v1/dags/TEST_DAG_ID/dagRuns?limit=6&offset=5"
+ )
+ assert response.status_code == 200
+ self.assertEqual(response.json.get('total_entries'), 10)
Review comment:
```suggestion
class TestGetDagRunsPagination(TestDagRunEndpoint):
@parameterized.expand(
[
("api/v1/dags/TEST_DAG_ID/dagRuns?limit=1",
["TEST_DAG_RUN_ID1"]),
("api/v1/dags/TEST_DAG_ID/dagRuns?limit=2", ["TEST_DAG_RUN_ID1",
"TEST_DAG_RUN_ID2"]),
(
"api/v1/dags/TEST_DAG_ID/dagRuns?offset=5",
[
"TEST_DAG_RUN_ID6",
"TEST_DAG_RUN_ID7",
"TEST_DAG_RUN_ID8",
"TEST_DAG_RUN_ID9",
"TEST_DAG_RUN_ID10",
],
),
(
"api/v1/dags/TEST_DAG_ID/dagRuns?offset=0",
[
"TEST_DAG_RUN_ID1",
"TEST_DAG_RUN_ID2",
"TEST_DAG_RUN_ID3",
"TEST_DAG_RUN_ID4",
"TEST_DAG_RUN_ID5",
"TEST_DAG_RUN_ID6",
"TEST_DAG_RUN_ID7",
"TEST_DAG_RUN_ID8",
"TEST_DAG_RUN_ID9",
"TEST_DAG_RUN_ID10",
],
),
("api/v1/dags/TEST_DAG_ID/dagRuns?limit=1&offset=5",
["TEST_DAG_RUN_ID6"]),
("api/v1/dags/TEST_DAG_ID/dagRuns?limit=1&offset=1",
["TEST_DAG_RUN_ID2"]),
(
"api/v1/dags/TEST_DAG_ID/dagRuns?limit=2&offset=2",
["TEST_DAG_RUN_ID3", "TEST_DAG_RUN_ID4"],
),
]
)
@provide_session
def test_handle_limit_and_offset(self, url, expected_dag_run_ids,
session):
dagrun_models = self._create_dag_runs(10)
session.add_all(dagrun_models)
session.commit()
response = self.client.get(url)
assert response.status_code == 200
self.assertEqual(response.json["total_entries"], 10)
dag_run_ids = [dag_run["dag_run_id"] for dag_run in
response.json["dag_runs"]]
self.assertEqual(dag_run_ids, expected_dag_run_ids)
@provide_session
def test_should_respect_page_size_limit(self, session):
dagrun_models = self._create_dag_runs(200)
session.add_all(dagrun_models)
session.commit()
response =
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns?limit=150")
assert response.status_code == 200
self.assertEqual(response.json["total_entries"], 200)
self.assertEqual(len(response.json["dag_run"]), 100)
def _create_dag_runs(self, count):
return [
DagRun(
dag_id="TEST_DAG_ID",
run_id="TEST_DAG_RUN_ID" + str(i),
run_type=DagRunType.MANUAL.value,
execution_date=self.now + timedelta(minutes=i),
start_date=self.now,
external_trigger=True,
)
for i in range(1, count + 1)
]
class TestGetDagRunsPaginationFilters(TestDagRunEndpoint):
```
Your code is a bit difficult to read. So I prepared an example to show you
some good practices.
1. If you have many tests that have tested a given feature then you can
create new classes for the new feature. It is not required that each view has
only one test class.
2. It is worth thinking what you want to check with a given test. If you
want to check pagination, you don't have to check all the fields in the
response. All you have to do is check the object identifiers. On the other
hand, if you're writing the first test in a class, it's worth it to have more
assertions to prevent regression.
3. It's a good idea to have one test data set or one data pattern for a test
group. This makes it easier for the reviewer to work because he does not have
to check that you have prepared the data correctly. If you want to check the
filter ranges (GTE / LTE) then you can create 5 objects and then use them in
other tests. You can create a new [factory method for creating test data.
##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -29,28 +35,555 @@ def setUpClass(cls) -> None:
def setUp(self) -> None:
self.client = self.app.test_client() # type:ignore
+ self.now = timezone.utcnow()
+ clear_db_runs()
+
+ def tearDown(self) -> None:
+ clear_db_runs()
class TestDeleteDagRun(TestDagRunEndpoint):
@pytest.mark.skip(reason="Not implemented yet")
def test_should_response_200(self):
- response =
self.client.delete("/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+ response =
self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
assert response.status_code == 204
class TestGetDagRun(TestDagRunEndpoint):
- @pytest.mark.skip(reason="Not implemented yet")
- def test_should_response_200(self):
- response = self.client.get("/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
+
+ @provide_session
+ def test_should_response_200(self, session):
+ dagrun_model = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now,
+ start_date=self.now,
+ external_trigger=True,
+ )
+ session.add(dagrun_model)
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 1
+ response =
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
assert response.status_code == 200
+ self.assertEqual(
+ response.json,
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': self.now.isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ }
+ )
+
+ def test_should_response_404(self):
+ response = self.client.get("api/v1/dags/invalid-id/dagRuns/invalid-id")
+ assert response.status_code == 404
+ self.assertEqual(
+ {
+ 'detail': None,
+ 'status': 404,
+ 'title': 'DAGRun not found',
+ 'type': 'about:blank'
+ },
+ response.json
+ )
class TestGetDagRuns(TestDagRunEndpoint):
- @pytest.mark.skip(reason="Not implemented yet")
- def test_should_response_200(self):
- response = self.client.get("/dags/TEST_DAG_ID/dagRuns/")
+
+ @provide_session
+ def test_should_response_200(self, session):
+ dagrun_model_1 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_1',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now,
+ start_date=self.now,
+ external_trigger=True,
+ )
+ dagrun_model_2 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_2',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=1),
+ start_date=self.now,
+ external_trigger=True,
+ )
+ session.add_all([dagrun_model_1, dagrun_model_2])
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 2
+ response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns")
+ assert response.status_code == 200
+ self.assertEqual(
+ response.json,
+ {
+ "dag_runs": [
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID_1',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': self.now.isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ },
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID_2',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': (self.now +
timedelta(minutes=1)).isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ }
+ ],
+ "total_entries": 2
+ }
+ )
+
+ @provide_session
+ def test_should_return_all_with_tilde_as_dag_id(self, session):
+ dagrun_model_1 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_1',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now,
+ start_date=self.now,
+ external_trigger=True,
+ )
+ dagrun_model_2 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_2',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=1),
+ start_date=self.now,
+ external_trigger=True,
+ )
+ session.add_all([dagrun_model_1, dagrun_model_2])
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 2
+ response = self.client.get("api/v1/dags/~/dagRuns")
+ assert response.status_code == 200
+ self.assertEqual(
+ response.json,
+ {
+ "dag_runs": [
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID_1',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': self.now.isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ },
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID_2',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': (self.now +
timedelta(minutes=1)).isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ }
+ ],
+ "total_entries": 2
+ }
+ )
+
+ @provide_session
+ def test_handle_limit_in_query(self, session):
+ dagrun_models = [DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID' + str(i),
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=i),
+ start_date=self.now,
+ external_trigger=True,
+ ) for i in range(100)]
+
+ session.add_all(dagrun_models)
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 100
+
+ response = self.client.get(
+ "api/v1/dags/TEST_DAG_ID/dagRuns?limit=1"
+ )
+ assert response.status_code == 200
+ self.assertEqual(response.json.get('total_entries'), 100)
+ self.assertEqual(
+ response.json,
+ {
+ "dag_runs": [
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID0',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': (self.now +
timedelta(minutes=0)).isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ }
+ ],
+ "total_entries": 100
+ }
+ )
+
+ @provide_session
+ def test_handle_offset_in_query(self, session):
+ dagrun_models = [DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID' + str(i),
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=i),
+ start_date=self.now,
+ external_trigger=True,
+ ) for i in range(4)]
+
+ session.add_all(dagrun_models)
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 4
+
+ response = self.client.get(
+ "api/v1/dags/TEST_DAG_ID/dagRuns?offset=1"
+ )
+ assert response.status_code == 200
+ self.assertEqual(response.json.get('total_entries'), 4)
+
+ @provide_session
+ def test_handle_limit_and_offset_in_query(self, session):
+ dagrun_models = [DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID' + str(i),
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=i),
+ start_date=self.now,
+ external_trigger=True,
+ ) for i in range(10)]
+
+ session.add_all(dagrun_models)
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 10
+
+ response = self.client.get(
+ "api/v1/dags/TEST_DAG_ID/dagRuns?limit=6&offset=5"
+ )
+ assert response.status_code == 200
+ self.assertEqual(response.json.get('total_entries'), 10)
+
+ @provide_session
+ def test_start_date_gte_and_lte(self, session):
+ dagrun_model_1 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_1',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now,
+ start_date=self.now, # today
+ external_trigger=True,
+ )
+ dagrun_model_2 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_2',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=1),
+ start_date=self.now + timedelta(days=3), # next 3 days
+ external_trigger=True,
+ )
+ session.add_all([dagrun_model_1, dagrun_model_2])
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 2
+ start_date_gte = (self.now + timedelta(days=1)).isoformat() # gte
tomorrow
+ start_date_lte = (self.now + timedelta(days=10)).isoformat() # lte
next 10 days
+
+ response = self.client.get(
+ f"api/v1/dags/TEST_DAG_ID/dagRuns?start_date_gte={start_date_gte}"
+ f"&start_date_lte={start_date_lte}"
+ )
+ assert response.status_code == 200
+ self.assertEqual(response.json.get('total_entries'), 2)
+ self.assertEqual(response.json.get('dag_runs')[0].get('start_date'),
+ (self.now + timedelta(days=3)).isoformat())
+
+ @provide_session
+ def test_only_start_date_gte_provided(self, session):
+ dagrun_model_1 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_1',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now,
+ start_date=self.now, # today
+ external_trigger=True,
+ )
+ dagrun_model_2 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_2',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=1),
+ start_date=self.now + timedelta(days=3), # next 3 days
+ external_trigger=True,
+ )
+ session.add_all([dagrun_model_1, dagrun_model_2])
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 2
+ start_date_gte = (self.now + timedelta(days=1)).isoformat() # gte
tomorrow
+ response = self.client.get(
+ f"api/v1/dags/TEST_DAG_ID/dagRuns?start_date_gte={start_date_gte}"
+ )
+ assert response.status_code == 200
+ self.assertEqual(response.json.get('total_entries'), 2)
+ self.assertEqual(response.json.get('dag_runs')[0].get('start_date'),
+ (self.now + timedelta(days=3)).isoformat())
+
+ @provide_session
+ def test_only_start_date_lte_provided(self, session):
+ dagrun_model_1 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_1',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now,
+ start_date=self.now, # today
+ external_trigger=True,
+ )
+ dagrun_model_2 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_2',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=1),
+ start_date=self.now + timedelta(days=3), # next 3 days
+ external_trigger=True,
+ )
+ session.add_all([dagrun_model_1, dagrun_model_2])
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 2
+ assert result[0].start_date == self.now
+ start_date_lte = (self.now + timedelta(days=1)).isoformat() # lte
tomorrow
+ response = self.client.get(
+ f"api/v1/dags/TEST_DAG_ID/dagRuns?start_date_lte={start_date_lte}"
+ )
+ assert response.status_code == 200
+
+ self.assertEqual(response.json.get('total_entries'), 2)
+ self.assertEqual(response.json.get('dag_runs')[0].get('start_date'),
+ self.now.isoformat()) # today
+
+ @provide_session
+ def test_execution_date_gte_and_lte(self, session):
+ dagrun_model_1 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_1',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now, # today
+ start_date=self.now,
+ external_trigger=True,
+ )
+ dagrun_model_2 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_2',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(days=3), # next 3 days,
+ start_date=self.now,
+ external_trigger=True,
+ )
+ session.add_all([dagrun_model_1, dagrun_model_2])
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 2
+ execution_date_gte = (self.now + timedelta(days=1)).isoformat() # gte
tomorrow
+ execution_date_lte = (self.now + timedelta(days=10)).isoformat() #
lte next 10 days
+
+ response = self.client.get(
+
f"api/v1/dags/TEST_DAG_ID/dagRuns?execution_date_gte={execution_date_gte}"
+ f"&execution_date_lte={execution_date_lte}"
+ )
+ assert response.status_code == 200
+ self.assertEqual(response.json.get('total_entries'), 2)
+
self.assertEqual(response.json.get('dag_runs')[0].get('execution_date'),
+ (self.now + timedelta(days=3)).isoformat())
+
+ @provide_session
+ def test_only_execution_date_gte_provided(self, session):
+ dagrun_model_1 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_1',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now, # today
+ start_date=self.now,
+ external_trigger=True,
+ )
+ dagrun_model_2 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_2',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(days=3), # next 3 days
+ start_date=self.now,
+ external_trigger=True,
+ )
+ session.add_all([dagrun_model_1, dagrun_model_2])
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 2
+ execution_date_gte = (self.now + timedelta(days=1)).isoformat() # gte
tomorrow
+ response = self.client.get(
+
f"api/v1/dags/TEST_DAG_ID/dagRuns?execution_date_gte={execution_date_gte}"
+ )
+ assert response.status_code == 200
+ self.assertEqual(response.json.get('total_entries'), 2)
+
self.assertEqual(response.json.get('dag_runs')[0].get('execution_date'),
+ (self.now + timedelta(days=3)).isoformat())
+
+ @provide_session
+ def test_only_execution_date_lte_provided(self, session):
+ dagrun_model_1 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_1',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now, # today
+ start_date=self.now,
+ external_trigger=True,
+ )
+ dagrun_model_2 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_2',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(days=3), # next 3 days
+ start_date=self.now,
+ external_trigger=True,
+ )
+ session.add_all([dagrun_model_1, dagrun_model_2])
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 2
+ execution_date_lte = (self.now + timedelta(days=1)).isoformat() # lte
tomorrow
+ response = self.client.get(
+
f"api/v1/dags/TEST_DAG_ID/dagRuns?execution_date_lte={execution_date_lte}"
+ )
+ assert response.status_code == 200
+
+ self.assertEqual(response.json.get('total_entries'), 2)
+
self.assertEqual(response.json.get('dag_runs')[0].get('execution_date'),
+ self.now.isoformat()) # today
+
+ @provide_session
+ def test_end_date_gte_and_lte_in_query(self, session):
+ dagrun_model_1 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_1',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now,
+ start_date=self.now,
+ external_trigger=True,
+ state='success' # today. The end date will be today
+ )
+ dagrun_model_2 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_2',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=1),
+ start_date=self.now + timedelta(days=3),
+ external_trigger=True,
+ ) # state is running so no end date yet
+ session.add_all([dagrun_model_1, dagrun_model_2])
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 2
+
+ end_date_gte = self.now.isoformat() # gte today
+ end_date_lte = (self.now + timedelta(days=1)).isoformat() # lte
tomorrow
+
+ response = self.client.get(
+ f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_gte={end_date_gte}"
+ f"&end_date_lte={end_date_lte}"
+ )
assert response.status_code == 200
+ self.assertEqual(response.json.get('total_entries'), 2)
+ self.assertEqual(response.json.get('dag_runs')[0].get('dag_run_id'),
+ "TEST_DAG_RUN_ID_1")
+
+ @provide_session
+ def test_only_end_date_gte_provided(self, session):
+ dagrun_model_1 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_1',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now,
+ start_date=self.now,
+ external_trigger=True,
+ state='success' # today. End date is today
+ )
+ dagrun_model_2 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_2',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(days=3),
+ start_date=self.now,
+ external_trigger=True,
+ ) # state is running so no end yet
+ session.add_all([dagrun_model_1, dagrun_model_2])
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 2
+ end_date_gte = (self.now + timedelta(days=1)).isoformat() # gte
tomorrow
+ response = self.client.get(
+ f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_gte={end_date_gte}"
+ )
+ assert response.status_code == 200
+ self.assertEqual(response.json.get('total_entries'), 2)
+
+ @provide_session
+ def test_only_end_date_lte_provided(self, session):
+ dagrun_model_1 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_1',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now,
+ start_date=self.now,
+ external_trigger=True,
+ state='failed' # today. End date is today
+ )
+ dagrun_model_2 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_2',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(days=3),
+ start_date=self.now,
+ external_trigger=True,
+ ) # state is running so no end date yet
+ session.add_all([dagrun_model_1, dagrun_model_2])
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 2
+ end_date_lte = (self.now + timedelta(days=1)).isoformat() # lte
tomorrow
+ response = self.client.get(
+ f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_lte={end_date_lte}"
+ )
+ assert response.status_code == 200
+
+ self.assertEqual(response.json.get('total_entries'), 2)
Review comment:
```suggestion
self.assertEqual(response.json['total_entries'], 2)
```
##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -29,28 +35,555 @@ def setUpClass(cls) -> None:
def setUp(self) -> None:
self.client = self.app.test_client() # type:ignore
+ self.now = timezone.utcnow()
+ clear_db_runs()
+
+ def tearDown(self) -> None:
+ clear_db_runs()
class TestDeleteDagRun(TestDagRunEndpoint):
@pytest.mark.skip(reason="Not implemented yet")
def test_should_response_200(self):
- response =
self.client.delete("/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+ response =
self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
assert response.status_code == 204
class TestGetDagRun(TestDagRunEndpoint):
- @pytest.mark.skip(reason="Not implemented yet")
- def test_should_response_200(self):
- response = self.client.get("/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
+
+ @provide_session
+ def test_should_response_200(self, session):
+ dagrun_model = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now,
+ start_date=self.now,
+ external_trigger=True,
+ )
+ session.add(dagrun_model)
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 1
+ response =
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
assert response.status_code == 200
+ self.assertEqual(
+ response.json,
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': self.now.isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ }
+ )
+
+ def test_should_response_404(self):
+ response = self.client.get("api/v1/dags/invalid-id/dagRuns/invalid-id")
+ assert response.status_code == 404
+ self.assertEqual(
+ {
+ 'detail': None,
+ 'status': 404,
+ 'title': 'DAGRun not found',
+ 'type': 'about:blank'
+ },
+ response.json
+ )
class TestGetDagRuns(TestDagRunEndpoint):
- @pytest.mark.skip(reason="Not implemented yet")
- def test_should_response_200(self):
- response = self.client.get("/dags/TEST_DAG_ID/dagRuns/")
+
+ @provide_session
+ def test_should_response_200(self, session):
+ dagrun_model_1 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_1',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now,
+ start_date=self.now,
+ external_trigger=True,
+ )
+ dagrun_model_2 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_2',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=1),
+ start_date=self.now,
+ external_trigger=True,
+ )
+ session.add_all([dagrun_model_1, dagrun_model_2])
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 2
+ response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns")
+ assert response.status_code == 200
+ self.assertEqual(
+ response.json,
+ {
+ "dag_runs": [
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID_1',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': self.now.isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ },
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID_2',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': (self.now +
timedelta(minutes=1)).isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ }
+ ],
+ "total_entries": 2
+ }
+ )
+
+ @provide_session
+ def test_should_return_all_with_tilde_as_dag_id(self, session):
+ dagrun_model_1 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_1',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now,
+ start_date=self.now,
+ external_trigger=True,
+ )
+ dagrun_model_2 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_2',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=1),
+ start_date=self.now,
+ external_trigger=True,
+ )
+ session.add_all([dagrun_model_1, dagrun_model_2])
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 2
+ response = self.client.get("api/v1/dags/~/dagRuns")
+ assert response.status_code == 200
+ self.assertEqual(
+ response.json,
+ {
+ "dag_runs": [
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID_1',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': self.now.isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ },
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID_2',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': (self.now +
timedelta(minutes=1)).isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ }
+ ],
+ "total_entries": 2
+ }
+ )
+
+ @provide_session
+ def test_handle_limit_in_query(self, session):
+ dagrun_models = [DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID' + str(i),
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=i),
+ start_date=self.now,
+ external_trigger=True,
+ ) for i in range(100)]
+
+ session.add_all(dagrun_models)
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 100
+
+ response = self.client.get(
+ "api/v1/dags/TEST_DAG_ID/dagRuns?limit=1"
+ )
+ assert response.status_code == 200
+ self.assertEqual(response.json.get('total_entries'), 100)
+ self.assertEqual(
+ response.json,
+ {
+ "dag_runs": [
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID0',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': (self.now +
timedelta(minutes=0)).isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ }
+ ],
+ "total_entries": 100
+ }
+ )
+
+ @provide_session
+ def test_handle_offset_in_query(self, session):
+ dagrun_models = [DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID' + str(i),
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=i),
+ start_date=self.now,
+ external_trigger=True,
+ ) for i in range(4)]
+
+ session.add_all(dagrun_models)
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 4
+
+ response = self.client.get(
+ "api/v1/dags/TEST_DAG_ID/dagRuns?offset=1"
+ )
+ assert response.status_code == 200
+ self.assertEqual(response.json.get('total_entries'), 4)
+
+ @provide_session
+ def test_handle_limit_and_offset_in_query(self, session):
+ dagrun_models = [DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID' + str(i),
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=i),
+ start_date=self.now,
+ external_trigger=True,
+ ) for i in range(10)]
+
+ session.add_all(dagrun_models)
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 10
+
+ response = self.client.get(
+ "api/v1/dags/TEST_DAG_ID/dagRuns?limit=6&offset=5"
+ )
+ assert response.status_code == 200
+ self.assertEqual(response.json.get('total_entries'), 10)
Review comment:
```suggestion
class TestGetDagRunsPagination(TestDagRunEndpoint):
@parameterized.expand(
[
("api/v1/dags/TEST_DAG_ID/dagRuns?limit=1",
["TEST_DAG_RUN_ID1"]),
("api/v1/dags/TEST_DAG_ID/dagRuns?limit=2", ["TEST_DAG_RUN_ID1",
"TEST_DAG_RUN_ID2"]),
(
"api/v1/dags/TEST_DAG_ID/dagRuns?offset=5",
[
"TEST_DAG_RUN_ID6",
"TEST_DAG_RUN_ID7",
"TEST_DAG_RUN_ID8",
"TEST_DAG_RUN_ID9",
"TEST_DAG_RUN_ID10",
],
),
(
"api/v1/dags/TEST_DAG_ID/dagRuns?offset=0",
[
"TEST_DAG_RUN_ID1",
"TEST_DAG_RUN_ID2",
"TEST_DAG_RUN_ID3",
"TEST_DAG_RUN_ID4",
"TEST_DAG_RUN_ID5",
"TEST_DAG_RUN_ID6",
"TEST_DAG_RUN_ID7",
"TEST_DAG_RUN_ID8",
"TEST_DAG_RUN_ID9",
"TEST_DAG_RUN_ID10",
],
),
("api/v1/dags/TEST_DAG_ID/dagRuns?limit=1&offset=5",
["TEST_DAG_RUN_ID6"]),
("api/v1/dags/TEST_DAG_ID/dagRuns?limit=1&offset=1",
["TEST_DAG_RUN_ID2"]),
(
"api/v1/dags/TEST_DAG_ID/dagRuns?limit=2&offset=2",
["TEST_DAG_RUN_ID3", "TEST_DAG_RUN_ID4"],
),
]
)
@provide_session
def test_handle_limit_and_offset(self, url, expected_dag_run_ids,
session):
dagrun_models = self._create_dag_runs(10)
session.add_all(dagrun_models)
session.commit()
response = self.client.get(url)
assert response.status_code == 200
self.assertEqual(response.json["total_entries"], 10)
dag_run_ids = [dag_run["dag_run_id"] for dag_run in
response.json["dag_runs"]]
self.assertEqual(dag_run_ids, expected_dag_run_ids)
@provide_session
def test_should_respect_page_size_limit(self, session):
dagrun_models = self._create_dag_runs(200)
session.add_all(dagrun_models)
session.commit()
response =
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns?limit=150")
assert response.status_code == 200
self.assertEqual(response.json["total_entries"], 200)
self.assertEqual(len(response.json["dag_run"]), 100)
def _create_dag_runs(self, count):
return [
DagRun(
dag_id="TEST_DAG_ID",
run_id="TEST_DAG_RUN_ID" + str(i),
run_type=DagRunType.MANUAL.value,
execution_date=self.now + timedelta(minutes=i),
start_date=self.now,
external_trigger=True,
)
for i in range(1, count + 1)
]
class TestGetDagRunsPaginationFilters(TestDagRunEndpoint):
```
Your code is a bit difficult to read. So I prepared an example to show you
some good practices.
1. If you have many tests that have tested a given feature then you can
create new classes for the new feature. It is not required that each view has
only one test class.
2. It is worth thinking what you want to check with a given test. If you
want to check pagination, you don't have to check all the fields in the
response. All you have to do is check the object identifiers. On the other
hand, if you're writing the first test in a class, it's worth it to have more
assertions to prevent regression.
3. It's a good idea to have one test data set or one data pattern for a test
group. This makes it easier for the reviewer to work because he does not have
to check that you have prepared the data correctly. If you want to check the
filter ranges (GTE / LTE) then you can create 5 objects and then use them in
other tests. You can create a new [factory method for creating test data.
4. If you have many similar tests then you can use parameterized to
automatically create many tests. This means that for all filters you only need
one test, but you will have to prepare test data well. Object identifiers
should be reviewer friendly.
##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -29,28 +35,555 @@ def setUpClass(cls) -> None:
def setUp(self) -> None:
self.client = self.app.test_client() # type:ignore
+ self.now = timezone.utcnow()
+ clear_db_runs()
+
+ def tearDown(self) -> None:
+ clear_db_runs()
class TestDeleteDagRun(TestDagRunEndpoint):
@pytest.mark.skip(reason="Not implemented yet")
def test_should_response_200(self):
- response =
self.client.delete("/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+ response =
self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
assert response.status_code == 204
class TestGetDagRun(TestDagRunEndpoint):
- @pytest.mark.skip(reason="Not implemented yet")
- def test_should_response_200(self):
- response = self.client.get("/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
+
+ @provide_session
+ def test_should_response_200(self, session):
+ dagrun_model = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now,
+ start_date=self.now,
+ external_trigger=True,
+ )
+ session.add(dagrun_model)
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 1
+ response =
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
assert response.status_code == 200
+ self.assertEqual(
+ response.json,
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': self.now.isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ }
+ )
+
+ def test_should_response_404(self):
+ response = self.client.get("api/v1/dags/invalid-id/dagRuns/invalid-id")
+ assert response.status_code == 404
+ self.assertEqual(
+ {
+ 'detail': None,
+ 'status': 404,
+ 'title': 'DAGRun not found',
+ 'type': 'about:blank'
+ },
+ response.json
+ )
class TestGetDagRuns(TestDagRunEndpoint):
- @pytest.mark.skip(reason="Not implemented yet")
- def test_should_response_200(self):
- response = self.client.get("/dags/TEST_DAG_ID/dagRuns/")
+
+ @provide_session
+ def test_should_response_200(self, session):
+ dagrun_model_1 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_1',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now,
+ start_date=self.now,
+ external_trigger=True,
+ )
+ dagrun_model_2 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_2',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=1),
+ start_date=self.now,
+ external_trigger=True,
+ )
+ session.add_all([dagrun_model_1, dagrun_model_2])
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 2
+ response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns")
+ assert response.status_code == 200
+ self.assertEqual(
+ response.json,
+ {
+ "dag_runs": [
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID_1',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': self.now.isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ },
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID_2',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': (self.now +
timedelta(minutes=1)).isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ }
+ ],
+ "total_entries": 2
+ }
+ )
+
+ @provide_session
+ def test_should_return_all_with_tilde_as_dag_id(self, session):
+ dagrun_model_1 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_1',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now,
+ start_date=self.now,
+ external_trigger=True,
+ )
+ dagrun_model_2 = DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID_2',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=1),
+ start_date=self.now,
+ external_trigger=True,
+ )
+ session.add_all([dagrun_model_1, dagrun_model_2])
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 2
+ response = self.client.get("api/v1/dags/~/dagRuns")
+ assert response.status_code == 200
+ self.assertEqual(
+ response.json,
+ {
+ "dag_runs": [
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID_1',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': self.now.isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ },
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID_2',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': (self.now +
timedelta(minutes=1)).isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ }
+ ],
+ "total_entries": 2
+ }
+ )
+
+ @provide_session
+ def test_handle_limit_in_query(self, session):
+ dagrun_models = [DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID' + str(i),
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=i),
+ start_date=self.now,
+ external_trigger=True,
+ ) for i in range(100)]
+
+ session.add_all(dagrun_models)
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 100
+
+ response = self.client.get(
+ "api/v1/dags/TEST_DAG_ID/dagRuns?limit=1"
+ )
+ assert response.status_code == 200
+ self.assertEqual(response.json.get('total_entries'), 100)
+ self.assertEqual(
+ response.json,
+ {
+ "dag_runs": [
+ {
+ 'dag_id': 'TEST_DAG_ID',
+ 'dag_run_id': 'TEST_DAG_RUN_ID0',
+ 'end_date': None,
+ 'state': 'running',
+ 'execution_date': (self.now +
timedelta(minutes=0)).isoformat(),
+ 'external_trigger': True,
+ 'start_date': self.now.isoformat(),
+ 'conf': {},
+ }
+ ],
+ "total_entries": 100
+ }
+ )
+
+ @provide_session
+ def test_handle_offset_in_query(self, session):
+ dagrun_models = [DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID' + str(i),
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=i),
+ start_date=self.now,
+ external_trigger=True,
+ ) for i in range(4)]
+
+ session.add_all(dagrun_models)
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 4
+
+ response = self.client.get(
+ "api/v1/dags/TEST_DAG_ID/dagRuns?offset=1"
+ )
+ assert response.status_code == 200
+ self.assertEqual(response.json.get('total_entries'), 4)
+
+ @provide_session
+ def test_handle_limit_and_offset_in_query(self, session):
+ dagrun_models = [DagRun(
+ dag_id='TEST_DAG_ID',
+ run_id='TEST_DAG_RUN_ID' + str(i),
+ run_type=DagRunType.MANUAL.value,
+ execution_date=self.now + timedelta(minutes=i),
+ start_date=self.now,
+ external_trigger=True,
+ ) for i in range(10)]
+
+ session.add_all(dagrun_models)
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 10
+
+ response = self.client.get(
+ "api/v1/dags/TEST_DAG_ID/dagRuns?limit=6&offset=5"
+ )
+ assert response.status_code == 200
+ self.assertEqual(response.json.get('total_entries'), 10)
Review comment:
```suggestion
class TestGetDagRunsPagination(TestDagRunEndpoint):
@parameterized.expand(
[
("api/v1/dags/TEST_DAG_ID/dagRuns?limit=1",
["TEST_DAG_RUN_ID1"]),
("api/v1/dags/TEST_DAG_ID/dagRuns?limit=2", ["TEST_DAG_RUN_ID1",
"TEST_DAG_RUN_ID2"]),
(
"api/v1/dags/TEST_DAG_ID/dagRuns?offset=5",
[
"TEST_DAG_RUN_ID6",
"TEST_DAG_RUN_ID7",
"TEST_DAG_RUN_ID8",
"TEST_DAG_RUN_ID9",
"TEST_DAG_RUN_ID10",
],
),
(
"api/v1/dags/TEST_DAG_ID/dagRuns?offset=0",
[
"TEST_DAG_RUN_ID1",
"TEST_DAG_RUN_ID2",
"TEST_DAG_RUN_ID3",
"TEST_DAG_RUN_ID4",
"TEST_DAG_RUN_ID5",
"TEST_DAG_RUN_ID6",
"TEST_DAG_RUN_ID7",
"TEST_DAG_RUN_ID8",
"TEST_DAG_RUN_ID9",
"TEST_DAG_RUN_ID10",
],
),
("api/v1/dags/TEST_DAG_ID/dagRuns?limit=1&offset=5",
["TEST_DAG_RUN_ID6"]),
("api/v1/dags/TEST_DAG_ID/dagRuns?limit=1&offset=1",
["TEST_DAG_RUN_ID2"]),
(
"api/v1/dags/TEST_DAG_ID/dagRuns?limit=2&offset=2",
["TEST_DAG_RUN_ID3", "TEST_DAG_RUN_ID4"],
),
]
)
@provide_session
def test_handle_limit_and_offset(self, url, expected_dag_run_ids,
session):
dagrun_models = self._create_dag_runs(10)
session.add_all(dagrun_models)
session.commit()
response = self.client.get(url)
assert response.status_code == 200
self.assertEqual(response.json["total_entries"], 10)
dag_run_ids = [dag_run["dag_run_id"] for dag_run in
response.json["dag_runs"]]
self.assertEqual(dag_run_ids, expected_dag_run_ids)
@provide_session
def test_should_respect_page_size_limit(self, session):
dagrun_models = self._create_dag_runs(200)
session.add_all(dagrun_models)
session.commit()
response =
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns?limit=150")
assert response.status_code == 200
self.assertEqual(response.json["total_entries"], 200)
self.assertEqual(len(response.json["dag_run"]), 100)
def _create_dag_runs(self, count):
return [
DagRun(
dag_id="TEST_DAG_ID",
run_id="TEST_DAG_RUN_ID" + str(i),
run_type=DagRunType.MANUAL.value,
execution_date=self.now + timedelta(minutes=i),
start_date=self.now,
external_trigger=True,
)
for i in range(1, count + 1)
]
class TestGetDagRunsPaginationFilters(TestDagRunEndpoint):
```
Your code is a bit difficult to read. So I prepared an example to show you
some good practices.
1. If you have many tests that have tested a given feature then you can
create new classes for the new feature. It is not required that each view has
only one test class.
2. It is worth thinking what you want to check with a given test. If you
want to check pagination, you don't have to check all the fields in the
response. All you have to do is check the object identifiers. On the other
hand, if you're writing the first test in a class, it's worth it to have more
assertions to prevent regression.
3. It's a good idea to have one test data set or one data pattern for a test
group. This makes it easier for the reviewer to work because he does not have
to check that you have prepared the data correctly. If you want to check the
filter ranges (GTE / LTE) then you can create 5 objects and then use them in
other tests. You can create a new [factory method for creating test data.
4. If you have many similar tests then you can use parameterized to
automatically create many tests. This means that for all filters you only need
one test, but you will have to prepare test data well. Object identifiers
should be reviewer friendly. This is not required. You can prepare a separate
set of test data for each filter.
##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -29,28 +35,555 @@ def setUpClass(cls) -> None:
def setUp(self) -> None:
self.client = self.app.test_client() # type:ignore
+ self.now = timezone.utcnow()
Review comment:
I would prefer to use a fixed date in tests. This allows us to use text
strings in tests. Now it is very difficult for me to check if the data format
is correct.
##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -26,18 +34,100 @@ def delete_dag_run():
raise NotImplementedError("Not implemented yet.")
-def get_dag_run():
+@provide_session
+def get_dag_run(dag_id, dag_run_id, session):
"""
Get a DAG Run.
"""
- raise NotImplementedError("Not implemented yet.")
+ query = session.query(DagRun)
+ query = query.filter(DagRun.dag_id == dag_id)
+ query = query.filter(DagRun.run_id == dag_run_id)
+ dag_run = query.one_or_none()
+ if dag_run is None:
+ raise NotFound("DAGRun not found")
+ return dagrun_schema.dump(dag_run)
-def get_dag_runs():
+@provide_session
+def get_dag_runs(dag_id, session):
"""
Get all DAG Runs.
"""
- raise NotImplementedError("Not implemented yet.")
+ offset = request.args.get(parameters.page_offset, 0)
+ limit = min(int(request.args.get(parameters.page_limit, 100)), 100)
+
+ start_date_gte = parse_datetime_in_query(
Review comment:
What do you think about
[webargs](https://webargs.readthedocs.io/en/latest/) library? Would it be
helpful to you?
##########
File path: airflow/api_connexion/schemas/dag_run_schema.py
##########
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List, NamedTuple
+
+from marshmallow import ValidationError, fields
+from marshmallow.schema import Schema
+from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field
+
+from airflow.api_connexion.schemas.enum_schemas import DagState
+from airflow.models.dagrun import DagRun
+
+
+class DAGRunSchema(SQLAlchemySchema):
+ """
+ Schema for DAGRun
+ """
+
+ class Meta:
+ """ Meta """
+ model = DagRun
+
+ run_id = auto_field(dump_to='dag_run_id', load_from='dag_run_id')
+ dag_id = auto_field(dump_only=True)
+ execution_date = auto_field()
+ start_date = auto_field(dump_only=True)
+ end_date = auto_field(dump_only=True)
+ state = fields.Method('get_state', deserialize='load_state')
Review comment:
It should. be defined as a custom fields. This allows you to eliminate
some hacks from the code.
```python
class DagStateField(fields.String):
def __init__(self, **metadata):
super().__init__(**metadata)
self.validators = (
[validate.OneOf(State.dag_states)] + list(self.validators)
)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]