mik-laj commented on a change in pull request #9153:
URL: https://github.com/apache/airflow/pull/9153#discussion_r436645575



##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -29,26 +35,471 @@ def setUpClass(cls) -> None:
 
     def setUp(self) -> None:
         self.client = self.app.test_client()  # type:ignore
+        self.now = timezone.utcnow()
+        clear_db_runs()
+
+    def tearDown(self) -> None:
+        clear_db_runs()
 
 
 class TestDeleteDagRun(TestDagRunEndpoint):
     @pytest.mark.skip(reason="Not implemented yet")
     def test_should_response_200(self):
-        response = 
self.client.delete("/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+        response = 
self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 204
 
 
 class TestGetDagRun(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.get("/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 1
+        response = 
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                'dag_id': 'TEST_DAG_ID',
+                'dag_run_id': 'TEST_DAG_RUN_ID',
+                'end_date': None,
+                'state': 'running',
+                'execution_date': self.now.isoformat(),
+                'external_trigger': True,
+                'start_date': self.now.isoformat(),
+                'conf': {},
+            }
+        )
+
+    def test_should_response_404(self):
+        response = self.client.get("api/v1/dags/invalid-id/dagRuns/invalid-id")
+        assert response.status_code == 404
+        self.assertEqual(
+            {
+                'detail': None,
+                'status': 404,
+                'title': 'DAGRun not found',
+                'type': 'about:blank'
+            },
+            response.json
+        )
 
 
 class TestGetDagRuns(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_should_return_all_with_tilde_as_dag_id(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/~/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_handle_limit_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(100)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 100
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?limit=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 1)
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID0',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=0)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 1
+            }
+        )
+
+    @provide_session
+    def test_handle_offset_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(4)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 4
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?offset=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 3)

Review comment:
       Yes. Separate query sound good.

##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -26,18 +34,100 @@ def delete_dag_run():
     raise NotImplementedError("Not implemented yet.")
 
 
-def get_dag_run():
+@provide_session
+def get_dag_run(dag_id, dag_run_id, session):
     """
     Get a DAG Run.
     """
-    raise NotImplementedError("Not implemented yet.")
+    query = session.query(DagRun)
+    query = query.filter(DagRun.dag_id == dag_id)
+    query = query.filter(DagRun.run_id == dag_run_id)
+    dag_run = query.one_or_none()
+    if dag_run is None:
+        raise NotFound("DAGRun not found")
+    return dagrun_schema.dump(dag_run)
 
 
-def get_dag_runs():
+@provide_session
+def get_dag_runs(dag_id, session):
     """
     Get all DAG Runs.
     """
-    raise NotImplementedError("Not implemented yet.")
+    offset = request.args.get(parameters.page_offset, 0)
+    limit = min(int(request.args.get(parameters.page_limit, 100)), 100)
+
+    start_date_gte = parse_datetime_in_query(
+        request.args.get(parameters.filter_start_date_gte, None)
+    )
+    start_date_lte = parse_datetime_in_query(
+        request.args.get(parameters.filter_start_date_lte, None)
+    )
+
+    execution_date_gte = parse_datetime_in_query(
+        request.args.get(parameters.filter_execution_date_gte, None)
+    )
+    execution_date_lte = parse_datetime_in_query(
+        request.args.get(parameters.filter_execution_date_lte, None)
+    )
+
+    end_date_gte = parse_datetime_in_query(
+        request.args.get(parameters.filter_end_date_gte, None)
+    )
+    end_date_lte = parse_datetime_in_query(
+        request.args.get(parameters.filter_end_date_lte, None)
+    )
+
+    query = session.query(DagRun)
+
+    #  This endpoint allows specifying ~ as the dag_id to retrieve DAG Runs 
for all DAGs.
+    if dag_id == '~':
+        dag_run = query.all()
+        return dagrun_collection_schema.dump(DAGRunCollection(
+            dag_runs=dag_run,
+            total_entries=len(dag_run))
+        )
+
+    query = query.filter(DagRun.dag_id == dag_id)

Review comment:
       ```suggestion
       if dag_id != '~':
           query = query.filter(DagRun.dag_id == dag_id)
   ```
   We still want to have filters and paginations. ~ is a wildcard. It allows 
any value, so we should give up filtering in this field.

##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -26,18 +34,100 @@ def delete_dag_run():
     raise NotImplementedError("Not implemented yet.")
 
 
-def get_dag_run():
+@provide_session
+def get_dag_run(dag_id, dag_run_id, session):
     """
     Get a DAG Run.
     """
-    raise NotImplementedError("Not implemented yet.")
+    query = session.query(DagRun)
+    query = query.filter(DagRun.dag_id == dag_id)
+    query = query.filter(DagRun.run_id == dag_run_id)
+    dag_run = query.one_or_none()
+    if dag_run is None:
+        raise NotFound("DAGRun not found")
+    return dagrun_schema.dump(dag_run)
 
 
-def get_dag_runs():
+@provide_session
+def get_dag_runs(dag_id, session):
     """
     Get all DAG Runs.
     """
-    raise NotImplementedError("Not implemented yet.")
+    offset = request.args.get(parameters.page_offset, 0)
+    limit = min(int(request.args.get(parameters.page_limit, 100)), 100)
+
+    start_date_gte = parse_datetime_in_query(
+        request.args.get(parameters.filter_start_date_gte, None)
+    )
+    start_date_lte = parse_datetime_in_query(
+        request.args.get(parameters.filter_start_date_lte, None)
+    )
+
+    execution_date_gte = parse_datetime_in_query(
+        request.args.get(parameters.filter_execution_date_gte, None)
+    )
+    execution_date_lte = parse_datetime_in_query(
+        request.args.get(parameters.filter_execution_date_lte, None)
+    )
+
+    end_date_gte = parse_datetime_in_query(
+        request.args.get(parameters.filter_end_date_gte, None)
+    )
+    end_date_lte = parse_datetime_in_query(
+        request.args.get(parameters.filter_end_date_lte, None)
+    )
+
+    query = session.query(DagRun)
+
+    #  This endpoint allows specifying ~ as the dag_id to retrieve DAG Runs 
for all DAGs.
+    if dag_id == '~':
+        dag_run = query.all()
+        return dagrun_collection_schema.dump(DAGRunCollection(
+            dag_runs=dag_run,
+            total_entries=len(dag_run))
+        )
+
+    query = query.filter(DagRun.dag_id == dag_id)
+
+    # filter start date
+    if start_date_gte and start_date_lte:
+        query = query.filter(DagRun.start_date <= start_date_lte,
+                             DagRun.start_date >= start_date_gte)
+
+    elif start_date_gte and not start_date_lte:
+        query = query.filter(DagRun.start_date >= start_date_gte)
+
+    elif start_date_lte and not start_date_gte:
+        query = query.filter(DagRun.start_date <= start_date_lte)

Review comment:
       ```suggestion
       if start_date_gte:
           query = query.filter(DagRun.start_date >= start_date_gte)
   
       if start_date_lte:
           query = query.filter(DagRun.start_date <= start_date_lte)
   ```
   Is this causing any problems?

##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -1264,6 +1264,7 @@ components:
           type: string
           format: date-time
           readOnly: True
+          nullable: true

Review comment:
       +1

##########
File path: tests/api_connexion/schemas/test_dag_run_schema.py
##########
@@ -0,0 +1,171 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+from airflow.api_connexion.schemas.dag_run_schema import (
+    DAGRunCollection, dagrun_collection_schema, dagrun_schema,
+)
+from airflow.models import DagRun
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.types import DagRunType
+from tests.test_utils.db import clear_db_runs
+
+
+class TestDAGRunBase(unittest.TestCase):
+
+    def setUp(self) -> None:
+        clear_db_runs()
+        self.now = timezone.utcnow()
+
+    def tearDown(self) -> None:
+        clear_db_runs()
+
+
+class TestDAGRunSchema(TestDAGRunBase):
+
+    @provide_session
+    def test_serialzie(self, session):
+        dagrun_model = DagRun(run_id='my-dag-run',
+                              run_type=DagRunType.MANUAL.value,
+                              execution_date=self.now,
+                              start_date=self.now,
+                              conf='{"start": "stop"}'
+                              )
+        session.add(dagrun_model)
+        session.commit()
+        dagrun_model = session.query(DagRun).first()
+        deserialized_dagrun = dagrun_schema.dump(dagrun_model)
+
+        self.assertEqual(
+            deserialized_dagrun[0],
+            {
+                'dag_id': None,
+                'dag_run_id': 'my-dag-run',
+                'end_date': None,
+                'state': 'running',
+                'execution_date': str(self.now.isoformat()),
+                'external_trigger': True,
+                'start_date': str(self.now.isoformat()),
+                'conf': '{"start": "stop"}'

Review comment:
       Why do we have a string here? The specification define object here.

##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -29,28 +35,555 @@ def setUpClass(cls) -> None:
 
     def setUp(self) -> None:
         self.client = self.app.test_client()  # type:ignore
+        self.now = timezone.utcnow()
+        clear_db_runs()
+
+    def tearDown(self) -> None:
+        clear_db_runs()
 
 
 class TestDeleteDagRun(TestDagRunEndpoint):
     @pytest.mark.skip(reason="Not implemented yet")
     def test_should_response_200(self):
-        response = 
self.client.delete("/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+        response = 
self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 204
 
 
 class TestGetDagRun(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.get("/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 1
+        response = 
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                'dag_id': 'TEST_DAG_ID',
+                'dag_run_id': 'TEST_DAG_RUN_ID',
+                'end_date': None,
+                'state': 'running',
+                'execution_date': self.now.isoformat(),
+                'external_trigger': True,
+                'start_date': self.now.isoformat(),
+                'conf': {},
+            }
+        )
+
+    def test_should_response_404(self):
+        response = self.client.get("api/v1/dags/invalid-id/dagRuns/invalid-id")
+        assert response.status_code == 404
+        self.assertEqual(
+            {
+                'detail': None,
+                'status': 404,
+                'title': 'DAGRun not found',
+                'type': 'about:blank'
+            },
+            response.json
+        )
 
 
 class TestGetDagRuns(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.get("/dags/TEST_DAG_ID/dagRuns/")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_should_return_all_with_tilde_as_dag_id(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/~/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_handle_limit_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(100)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 100
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?limit=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 100)
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID0',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=0)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 100
+            }
+        )
+
+    @provide_session
+    def test_handle_offset_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(4)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 4
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?offset=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 4)
+
+    @provide_session
+    def test_handle_limit_and_offset_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(10)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 10
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?limit=6&offset=5"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 10)

Review comment:
       ```suggestion
   class TestGetDagRunsPagination(TestDagRunEndpoint):
       @parameterized.expand(
           [
               ("api/v1/dags/TEST_DAG_ID/dagRuns?limit=1", 
["TEST_DAG_RUN_ID1"]),
               ("api/v1/dags/TEST_DAG_ID/dagRuns?limit=2", ["TEST_DAG_RUN_ID1", 
"TEST_DAG_RUN_ID2"]),
               (
                   "api/v1/dags/TEST_DAG_ID/dagRuns?offset=5",
                   [
                       "TEST_DAG_RUN_ID6",
                       "TEST_DAG_RUN_ID7",
                       "TEST_DAG_RUN_ID8",
                       "TEST_DAG_RUN_ID9",
                       "TEST_DAG_RUN_ID10",
                   ],
               ),
               (
                   "api/v1/dags/TEST_DAG_ID/dagRuns?offset=0",
                   [
                       "TEST_DAG_RUN_ID1",
                       "TEST_DAG_RUN_ID2",
                       "TEST_DAG_RUN_ID3",
                       "TEST_DAG_RUN_ID4",
                       "TEST_DAG_RUN_ID5",
                       "TEST_DAG_RUN_ID6",
                       "TEST_DAG_RUN_ID7",
                       "TEST_DAG_RUN_ID8",
                       "TEST_DAG_RUN_ID9",
                       "TEST_DAG_RUN_ID10",
                   ],
               ),
               ("api/v1/dags/TEST_DAG_ID/dagRuns?limit=1&offset=5", 
["TEST_DAG_RUN_ID6"]),
               ("api/v1/dags/TEST_DAG_ID/dagRuns?limit=1&offset=1", 
["TEST_DAG_RUN_ID2"]),
               (
                   "api/v1/dags/TEST_DAG_ID/dagRuns?limit=2&offset=2",
                   ["TEST_DAG_RUN_ID3", "TEST_DAG_RUN_ID4"],
               ),
           ]
       )
       @provide_session
       def test_handle_limit_and_offset(self, url, expected_dag_run_ids, 
session):
           dagrun_models = self._create_dag_runs(10)
           session.add_all(dagrun_models)
           session.commit()
   
           response = self.client.get(url)
           assert response.status_code == 200
   
           self.assertEqual(response.json["total_entries"], 10)
           dag_run_ids = [dag_run["dag_run_id"] for dag_run in 
response.json["dag_runs"]]
           self.assertEqual(dag_run_ids, expected_dag_run_ids)
   
       @provide_session
       def test_should_respect_page_size_limit(self, session):
           dagrun_models = self._create_dag_runs(200)
           session.add_all(dagrun_models)
           session.commit()
   
           response = 
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns?limit=150")
           assert response.status_code == 200
   
           self.assertEqual(response.json["total_entries"], 200)
           self.assertEqual(len(response.json["dag_run"]), 100)
   
       def _create_dag_runs(self, count):
           return [
               DagRun(
                   dag_id="TEST_DAG_ID",
                   run_id="TEST_DAG_RUN_ID" + str(i),
                   run_type=DagRunType.MANUAL.value,
                   execution_date=self.now + timedelta(minutes=i),
                   start_date=self.now,
                   external_trigger=True,
               )
               for i in range(1, count + 1)
           ]
   
   
   class TestGetDagRunsPaginationFilters(TestDagRunEndpoint):
   ```
   
   Your code is a bit difficult to read. So I prepared an example to show you 
some good practices.
   1. If you have many tests that have tested a given feature then you can 
create new classes for the new feature. It is not required that each view has 
only one test class.
   2. It is worth thinking what you want to check with a given test.  If you 
want to check pagination, you don't have to check all the fields in the 
response. All you have to do is check the object identifiers.  On the other 
hand, if you're writing the first test in a class, it's worth it to have more 
assertions to prevent regression.
   3. It's a good idea to have one test data set or one data pattern for a test 
group. This makes it easier for the reviewer to work because he does not have 
to check that you have prepared the data correctly. If you want to check the 
filter ranges (GTE / LTE) then you can create 5 objects and then use them in 
other tests.  You can create a new [factory method for creating test data.
   

##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -29,28 +35,555 @@ def setUpClass(cls) -> None:
 
     def setUp(self) -> None:
         self.client = self.app.test_client()  # type:ignore
+        self.now = timezone.utcnow()
+        clear_db_runs()
+
+    def tearDown(self) -> None:
+        clear_db_runs()
 
 
 class TestDeleteDagRun(TestDagRunEndpoint):
     @pytest.mark.skip(reason="Not implemented yet")
     def test_should_response_200(self):
-        response = 
self.client.delete("/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+        response = 
self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 204
 
 
 class TestGetDagRun(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.get("/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 1
+        response = 
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                'dag_id': 'TEST_DAG_ID',
+                'dag_run_id': 'TEST_DAG_RUN_ID',
+                'end_date': None,
+                'state': 'running',
+                'execution_date': self.now.isoformat(),
+                'external_trigger': True,
+                'start_date': self.now.isoformat(),
+                'conf': {},
+            }
+        )
+
+    def test_should_response_404(self):
+        response = self.client.get("api/v1/dags/invalid-id/dagRuns/invalid-id")
+        assert response.status_code == 404
+        self.assertEqual(
+            {
+                'detail': None,
+                'status': 404,
+                'title': 'DAGRun not found',
+                'type': 'about:blank'
+            },
+            response.json
+        )
 
 
 class TestGetDagRuns(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.get("/dags/TEST_DAG_ID/dagRuns/")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_should_return_all_with_tilde_as_dag_id(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/~/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_handle_limit_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(100)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 100
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?limit=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 100)
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID0',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=0)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 100
+            }
+        )
+
+    @provide_session
+    def test_handle_offset_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(4)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 4
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?offset=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 4)
+
+    @provide_session
+    def test_handle_limit_and_offset_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(10)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 10
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?limit=6&offset=5"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 10)
+
+    @provide_session
+    def test_start_date_gte_and_lte(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,  # today
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now + timedelta(days=3),  # next 3 days
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        start_date_gte = (self.now + timedelta(days=1)).isoformat()  # gte 
tomorrow
+        start_date_lte = (self.now + timedelta(days=10)).isoformat()  # lte 
next 10 days
+
+        response = self.client.get(
+            f"api/v1/dags/TEST_DAG_ID/dagRuns?start_date_gte={start_date_gte}"
+            f"&start_date_lte={start_date_lte}"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 2)
+        self.assertEqual(response.json.get('dag_runs')[0].get('start_date'),
+                         (self.now + timedelta(days=3)).isoformat())
+
+    @provide_session
+    def test_only_start_date_gte_provided(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,  # today
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now + timedelta(days=3),  # next 3 days
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        start_date_gte = (self.now + timedelta(days=1)).isoformat()  # gte 
tomorrow
+        response = self.client.get(
+            f"api/v1/dags/TEST_DAG_ID/dagRuns?start_date_gte={start_date_gte}"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 2)
+        self.assertEqual(response.json.get('dag_runs')[0].get('start_date'),
+                         (self.now + timedelta(days=3)).isoformat())
+
+    @provide_session
+    def test_only_start_date_lte_provided(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,  # today
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now + timedelta(days=3),  # next 3 days
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        assert result[0].start_date == self.now
+        start_date_lte = (self.now + timedelta(days=1)).isoformat()  # lte 
tomorrow
+        response = self.client.get(
+            f"api/v1/dags/TEST_DAG_ID/dagRuns?start_date_lte={start_date_lte}"
+        )
+        assert response.status_code == 200
+
+        self.assertEqual(response.json.get('total_entries'), 2)
+        self.assertEqual(response.json.get('dag_runs')[0].get('start_date'),
+                         self.now.isoformat())  # today
+
+    @provide_session
+    def test_execution_date_gte_and_lte(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,  # today
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(days=3),  # next 3 days,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        execution_date_gte = (self.now + timedelta(days=1)).isoformat()  # gte 
tomorrow
+        execution_date_lte = (self.now + timedelta(days=10)).isoformat()  # 
lte next 10 days
+
+        response = self.client.get(
+            
f"api/v1/dags/TEST_DAG_ID/dagRuns?execution_date_gte={execution_date_gte}"
+            f"&execution_date_lte={execution_date_lte}"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 2)
+        
self.assertEqual(response.json.get('dag_runs')[0].get('execution_date'),
+                         (self.now + timedelta(days=3)).isoformat())
+
+    @provide_session
+    def test_only_execution_date_gte_provided(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,  # today
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(days=3),  # next 3 days
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        execution_date_gte = (self.now + timedelta(days=1)).isoformat()  # gte 
tomorrow
+        response = self.client.get(
+            
f"api/v1/dags/TEST_DAG_ID/dagRuns?execution_date_gte={execution_date_gte}"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 2)
+        
self.assertEqual(response.json.get('dag_runs')[0].get('execution_date'),
+                         (self.now + timedelta(days=3)).isoformat())
+
+    @provide_session
+    def test_only_execution_date_lte_provided(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,  # today
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(days=3),  # next 3 days
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        execution_date_lte = (self.now + timedelta(days=1)).isoformat()  # lte 
tomorrow
+        response = self.client.get(
+            
f"api/v1/dags/TEST_DAG_ID/dagRuns?execution_date_lte={execution_date_lte}"
+        )
+        assert response.status_code == 200
+
+        self.assertEqual(response.json.get('total_entries'), 2)
+        
self.assertEqual(response.json.get('dag_runs')[0].get('execution_date'),
+                         self.now.isoformat())  # today
+
+    @provide_session
+    def test_end_date_gte_and_lte_in_query(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+            state='success'  # today. The end date will be today
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now + timedelta(days=3),
+            external_trigger=True,
+        )  # state is running so no end date yet
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+
+        end_date_gte = self.now.isoformat()  # gte today
+        end_date_lte = (self.now + timedelta(days=1)).isoformat()  # lte 
tomorrow
+
+        response = self.client.get(
+            f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_gte={end_date_gte}"
+            f"&end_date_lte={end_date_lte}"
+        )
         assert response.status_code == 200
 
+        self.assertEqual(response.json.get('total_entries'), 2)
+        self.assertEqual(response.json.get('dag_runs')[0].get('dag_run_id'),
+                         "TEST_DAG_RUN_ID_1")
+
+    @provide_session
+    def test_only_end_date_gte_provided(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+            state='success'  # today. End date is today
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(days=3),
+            start_date=self.now,
+            external_trigger=True,
+        )  # state is running so no end yet
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        end_date_gte = (self.now + timedelta(days=1)).isoformat()  # gte 
tomorrow
+        response = self.client.get(
+            f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_gte={end_date_gte}"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 2)
+
+    @provide_session
+    def test_only_end_date_lte_provided(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+            state='failed'  # today. End date is today
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(days=3),
+            start_date=self.now,
+            external_trigger=True,
+        )  # state is running so no end date yet
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        end_date_lte = (self.now + timedelta(days=1)).isoformat()  # lte 
tomorrow
+        response = self.client.get(
+            f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_lte={end_date_lte}"
+        )
+        assert response.status_code == 200
+
+        self.assertEqual(response.json.get('total_entries'), 2)

Review comment:
       ```suggestion
           self.assertEqual(response.json['total_entries'], 2)
   ```

##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -29,28 +35,555 @@ def setUpClass(cls) -> None:
 
     def setUp(self) -> None:
         self.client = self.app.test_client()  # type:ignore
+        self.now = timezone.utcnow()
+        clear_db_runs()
+
+    def tearDown(self) -> None:
+        clear_db_runs()
 
 
 class TestDeleteDagRun(TestDagRunEndpoint):
     @pytest.mark.skip(reason="Not implemented yet")
     def test_should_response_200(self):
-        response = 
self.client.delete("/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+        response = 
self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 204
 
 
 class TestGetDagRun(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.get("/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 1
+        response = 
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                'dag_id': 'TEST_DAG_ID',
+                'dag_run_id': 'TEST_DAG_RUN_ID',
+                'end_date': None,
+                'state': 'running',
+                'execution_date': self.now.isoformat(),
+                'external_trigger': True,
+                'start_date': self.now.isoformat(),
+                'conf': {},
+            }
+        )
+
+    def test_should_response_404(self):
+        response = self.client.get("api/v1/dags/invalid-id/dagRuns/invalid-id")
+        assert response.status_code == 404
+        self.assertEqual(
+            {
+                'detail': None,
+                'status': 404,
+                'title': 'DAGRun not found',
+                'type': 'about:blank'
+            },
+            response.json
+        )
 
 
 class TestGetDagRuns(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.get("/dags/TEST_DAG_ID/dagRuns/")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_should_return_all_with_tilde_as_dag_id(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/~/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_handle_limit_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(100)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 100
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?limit=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 100)
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID0',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=0)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 100
+            }
+        )
+
+    @provide_session
+    def test_handle_offset_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(4)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 4
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?offset=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 4)
+
+    @provide_session
+    def test_handle_limit_and_offset_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(10)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 10
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?limit=6&offset=5"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 10)

Review comment:
       ```suggestion
   class TestGetDagRunsPagination(TestDagRunEndpoint):
       @parameterized.expand(
           [
               ("api/v1/dags/TEST_DAG_ID/dagRuns?limit=1", 
["TEST_DAG_RUN_ID1"]),
               ("api/v1/dags/TEST_DAG_ID/dagRuns?limit=2", ["TEST_DAG_RUN_ID1", 
"TEST_DAG_RUN_ID2"]),
               (
                   "api/v1/dags/TEST_DAG_ID/dagRuns?offset=5",
                   [
                       "TEST_DAG_RUN_ID6",
                       "TEST_DAG_RUN_ID7",
                       "TEST_DAG_RUN_ID8",
                       "TEST_DAG_RUN_ID9",
                       "TEST_DAG_RUN_ID10",
                   ],
               ),
               (
                   "api/v1/dags/TEST_DAG_ID/dagRuns?offset=0",
                   [
                       "TEST_DAG_RUN_ID1",
                       "TEST_DAG_RUN_ID2",
                       "TEST_DAG_RUN_ID3",
                       "TEST_DAG_RUN_ID4",
                       "TEST_DAG_RUN_ID5",
                       "TEST_DAG_RUN_ID6",
                       "TEST_DAG_RUN_ID7",
                       "TEST_DAG_RUN_ID8",
                       "TEST_DAG_RUN_ID9",
                       "TEST_DAG_RUN_ID10",
                   ],
               ),
               ("api/v1/dags/TEST_DAG_ID/dagRuns?limit=1&offset=5", 
["TEST_DAG_RUN_ID6"]),
               ("api/v1/dags/TEST_DAG_ID/dagRuns?limit=1&offset=1", 
["TEST_DAG_RUN_ID2"]),
               (
                   "api/v1/dags/TEST_DAG_ID/dagRuns?limit=2&offset=2",
                   ["TEST_DAG_RUN_ID3", "TEST_DAG_RUN_ID4"],
               ),
           ]
       )
       @provide_session
       def test_handle_limit_and_offset(self, url, expected_dag_run_ids, 
session):
           dagrun_models = self._create_dag_runs(10)
           session.add_all(dagrun_models)
           session.commit()
   
           response = self.client.get(url)
           assert response.status_code == 200
   
           self.assertEqual(response.json["total_entries"], 10)
           dag_run_ids = [dag_run["dag_run_id"] for dag_run in 
response.json["dag_runs"]]
           self.assertEqual(dag_run_ids, expected_dag_run_ids)
   
       @provide_session
       def test_should_respect_page_size_limit(self, session):
           dagrun_models = self._create_dag_runs(200)
           session.add_all(dagrun_models)
           session.commit()
   
           response = 
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns?limit=150")
           assert response.status_code == 200
   
           self.assertEqual(response.json["total_entries"], 200)
           self.assertEqual(len(response.json["dag_run"]), 100)
   
       def _create_dag_runs(self, count):
           return [
               DagRun(
                   dag_id="TEST_DAG_ID",
                   run_id="TEST_DAG_RUN_ID" + str(i),
                   run_type=DagRunType.MANUAL.value,
                   execution_date=self.now + timedelta(minutes=i),
                   start_date=self.now,
                   external_trigger=True,
               )
               for i in range(1, count + 1)
           ]
   
   
   class TestGetDagRunsPaginationFilters(TestDagRunEndpoint):
   ```
   
   Your code is a bit difficult to read. So I prepared an example to show you 
some good practices.
   1. If you have many tests that have tested a given feature then you can 
create new classes for the new feature. It is not required that each view has 
only one test class.
   2. It is worth thinking what you want to check with a given test.  If you 
want to check pagination, you don't have to check all the fields in the 
response. All you have to do is check the object identifiers.  On the other 
hand, if you're writing the first test in a class, it's worth it to have more 
assertions to prevent regression.
   3. It's a good idea to have one test data set or one data pattern for a test 
group. This makes it easier for the reviewer to work because he does not have 
to check that you have prepared the data correctly. If you want to check the 
filter ranges (GTE / LTE) then you can create 5 objects and then use them in 
other tests.  You can create a new [factory method for creating test data.
   4. If you have many similar tests then you can use parameterized to 
automatically create many tests.  This means that for all filters you only need 
one test, but you will have to prepare test data well.  Object identifiers 
should be reviewer friendly.
   

##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -29,28 +35,555 @@ def setUpClass(cls) -> None:
 
     def setUp(self) -> None:
         self.client = self.app.test_client()  # type:ignore
+        self.now = timezone.utcnow()
+        clear_db_runs()
+
+    def tearDown(self) -> None:
+        clear_db_runs()
 
 
 class TestDeleteDagRun(TestDagRunEndpoint):
     @pytest.mark.skip(reason="Not implemented yet")
     def test_should_response_200(self):
-        response = 
self.client.delete("/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+        response = 
self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 204
 
 
 class TestGetDagRun(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.get("/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 1
+        response = 
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                'dag_id': 'TEST_DAG_ID',
+                'dag_run_id': 'TEST_DAG_RUN_ID',
+                'end_date': None,
+                'state': 'running',
+                'execution_date': self.now.isoformat(),
+                'external_trigger': True,
+                'start_date': self.now.isoformat(),
+                'conf': {},
+            }
+        )
+
+    def test_should_response_404(self):
+        response = self.client.get("api/v1/dags/invalid-id/dagRuns/invalid-id")
+        assert response.status_code == 404
+        self.assertEqual(
+            {
+                'detail': None,
+                'status': 404,
+                'title': 'DAGRun not found',
+                'type': 'about:blank'
+            },
+            response.json
+        )
 
 
 class TestGetDagRuns(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.get("/dags/TEST_DAG_ID/dagRuns/")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_should_return_all_with_tilde_as_dag_id(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/~/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_handle_limit_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(100)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 100
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?limit=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 100)
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID0',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=0)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 100
+            }
+        )
+
+    @provide_session
+    def test_handle_offset_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(4)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 4
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?offset=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 4)
+
+    @provide_session
+    def test_handle_limit_and_offset_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(10)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 10
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?limit=6&offset=5"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 10)

Review comment:
       ```suggestion
   class TestGetDagRunsPagination(TestDagRunEndpoint):
       @parameterized.expand(
           [
               ("api/v1/dags/TEST_DAG_ID/dagRuns?limit=1", 
["TEST_DAG_RUN_ID1"]),
               ("api/v1/dags/TEST_DAG_ID/dagRuns?limit=2", ["TEST_DAG_RUN_ID1", 
"TEST_DAG_RUN_ID2"]),
               (
                   "api/v1/dags/TEST_DAG_ID/dagRuns?offset=5",
                   [
                       "TEST_DAG_RUN_ID6",
                       "TEST_DAG_RUN_ID7",
                       "TEST_DAG_RUN_ID8",
                       "TEST_DAG_RUN_ID9",
                       "TEST_DAG_RUN_ID10",
                   ],
               ),
               (
                   "api/v1/dags/TEST_DAG_ID/dagRuns?offset=0",
                   [
                       "TEST_DAG_RUN_ID1",
                       "TEST_DAG_RUN_ID2",
                       "TEST_DAG_RUN_ID3",
                       "TEST_DAG_RUN_ID4",
                       "TEST_DAG_RUN_ID5",
                       "TEST_DAG_RUN_ID6",
                       "TEST_DAG_RUN_ID7",
                       "TEST_DAG_RUN_ID8",
                       "TEST_DAG_RUN_ID9",
                       "TEST_DAG_RUN_ID10",
                   ],
               ),
               ("api/v1/dags/TEST_DAG_ID/dagRuns?limit=1&offset=5", 
["TEST_DAG_RUN_ID6"]),
               ("api/v1/dags/TEST_DAG_ID/dagRuns?limit=1&offset=1", 
["TEST_DAG_RUN_ID2"]),
               (
                   "api/v1/dags/TEST_DAG_ID/dagRuns?limit=2&offset=2",
                   ["TEST_DAG_RUN_ID3", "TEST_DAG_RUN_ID4"],
               ),
           ]
       )
       @provide_session
       def test_handle_limit_and_offset(self, url, expected_dag_run_ids, 
session):
           dagrun_models = self._create_dag_runs(10)
           session.add_all(dagrun_models)
           session.commit()
   
           response = self.client.get(url)
           assert response.status_code == 200
   
           self.assertEqual(response.json["total_entries"], 10)
           dag_run_ids = [dag_run["dag_run_id"] for dag_run in 
response.json["dag_runs"]]
           self.assertEqual(dag_run_ids, expected_dag_run_ids)
   
       @provide_session
       def test_should_respect_page_size_limit(self, session):
           dagrun_models = self._create_dag_runs(200)
           session.add_all(dagrun_models)
           session.commit()
   
           response = 
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns?limit=150")
           assert response.status_code == 200
   
           self.assertEqual(response.json["total_entries"], 200)
           self.assertEqual(len(response.json["dag_run"]), 100)
   
       def _create_dag_runs(self, count):
           return [
               DagRun(
                   dag_id="TEST_DAG_ID",
                   run_id="TEST_DAG_RUN_ID" + str(i),
                   run_type=DagRunType.MANUAL.value,
                   execution_date=self.now + timedelta(minutes=i),
                   start_date=self.now,
                   external_trigger=True,
               )
               for i in range(1, count + 1)
           ]
   
   
   class TestGetDagRunsPaginationFilters(TestDagRunEndpoint):
   ```
   
   Your code is a bit difficult to read. So I prepared an example to show you 
some good practices.
   1. If you have many tests that have tested a given feature then you can 
create new classes for the new feature. It is not required that each view has 
only one test class.
   2. It is worth thinking what you want to check with a given test.  If you 
want to check pagination, you don't have to check all the fields in the 
response. All you have to do is check the object identifiers.  On the other 
hand, if you're writing the first test in a class, it's worth it to have more 
assertions to prevent regression.
   3. It's a good idea to have one test data set or one data pattern for a test 
group. This makes it easier for the reviewer to work because he does not have 
to check that you have prepared the data correctly. If you want to check the 
filter ranges (GTE / LTE) then you can create 5 objects and then use them in 
other tests.  You can create a new [factory method for creating test data.
   4. If you have many similar tests then you can use parameterized to 
automatically create many tests.  This means that for all filters you only need 
one test, but you will have to prepare test data well.  Object identifiers 
should be reviewer friendly.  This is not required. You can prepare a separate 
set of test data for each filter.
   

##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -29,28 +35,555 @@ def setUpClass(cls) -> None:
 
     def setUp(self) -> None:
         self.client = self.app.test_client()  # type:ignore
+        self.now = timezone.utcnow()

Review comment:
       I would prefer to use a fixed date in tests. This allows us to use text 
strings in tests.  Now it is very difficult for me to check if the data format 
is correct.

##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -26,18 +34,100 @@ def delete_dag_run():
     raise NotImplementedError("Not implemented yet.")
 
 
-def get_dag_run():
+@provide_session
+def get_dag_run(dag_id, dag_run_id, session):
     """
     Get a DAG Run.
     """
-    raise NotImplementedError("Not implemented yet.")
+    query = session.query(DagRun)
+    query = query.filter(DagRun.dag_id == dag_id)
+    query = query.filter(DagRun.run_id == dag_run_id)
+    dag_run = query.one_or_none()
+    if dag_run is None:
+        raise NotFound("DAGRun not found")
+    return dagrun_schema.dump(dag_run)
 
 
-def get_dag_runs():
+@provide_session
+def get_dag_runs(dag_id, session):
     """
     Get all DAG Runs.
     """
-    raise NotImplementedError("Not implemented yet.")
+    offset = request.args.get(parameters.page_offset, 0)
+    limit = min(int(request.args.get(parameters.page_limit, 100)), 100)
+
+    start_date_gte = parse_datetime_in_query(

Review comment:
       What do you think about 
[webargs](https://webargs.readthedocs.io/en/latest/) library? Would it be 
helpful to you? 
   

##########
File path: airflow/api_connexion/schemas/dag_run_schema.py
##########
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List, NamedTuple
+
+from marshmallow import ValidationError, fields
+from marshmallow.schema import Schema
+from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field
+
+from airflow.api_connexion.schemas.enum_schemas import DagState
+from airflow.models.dagrun import DagRun
+
+
+class DAGRunSchema(SQLAlchemySchema):
+    """
+    Schema for DAGRun
+    """
+
+    class Meta:
+        """ Meta """
+        model = DagRun
+
+    run_id = auto_field(dump_to='dag_run_id', load_from='dag_run_id')
+    dag_id = auto_field(dump_only=True)
+    execution_date = auto_field()
+    start_date = auto_field(dump_only=True)
+    end_date = auto_field(dump_only=True)
+    state = fields.Method('get_state', deserialize='load_state')

Review comment:
       It should. be defined as a custom fields.  This allows you to eliminate 
some hacks from the code.
   ```python
   class DagStateField(fields.String):
       
       def __init__(self, **metadata):
           super().__init__(**metadata)
           self.validators = (
               [validate.OneOf(State.dag_states)] + list(self.validators)
           )
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to