ephraimbuddy commented on a change in pull request #9153:
URL: https://github.com/apache/airflow/pull/9153#discussion_r436643921



##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -29,26 +35,471 @@ def setUpClass(cls) -> None:
 
     def setUp(self) -> None:
         self.client = self.app.test_client()  # type:ignore
+        self.now = timezone.utcnow()
+        clear_db_runs()
+
+    def tearDown(self) -> None:
+        clear_db_runs()
 
 
 class TestDeleteDagRun(TestDagRunEndpoint):
     @pytest.mark.skip(reason="Not implemented yet")
     def test_should_response_200(self):
-        response = 
self.client.delete("/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+        response = 
self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 204
 
 
 class TestGetDagRun(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.get("/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 1
+        response = 
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                'dag_id': 'TEST_DAG_ID',
+                'dag_run_id': 'TEST_DAG_RUN_ID',
+                'end_date': None,
+                'state': 'running',
+                'execution_date': self.now.isoformat(),
+                'external_trigger': True,
+                'start_date': self.now.isoformat(),
+                'conf': {},
+            }
+        )
+
+    def test_should_response_404(self):
+        response = self.client.get("api/v1/dags/invalid-id/dagRuns/invalid-id")
+        assert response.status_code == 404
+        self.assertEqual(
+            {
+                'detail': None,
+                'status': 404,
+                'title': 'DAGRun not found',
+                'type': 'about:blank'
+            },
+            response.json
+        )
 
 
 class TestGetDagRuns(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_should_return_all_with_tilde_as_dag_id(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/~/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_handle_limit_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(100)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 100
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?limit=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 1)
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID0',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=0)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 1
+            }
+        )
+
+    @provide_session
+    def test_handle_offset_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(4)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 4
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?offset=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 3)

Review comment:
       Looks like the offset=1 is what is making it to return 3 instead of 4. I 
have tried different ways for the schema and total_entries but it always come 
as 3 instead of 4. I used pdb debugger and found out the offset is the cause. 
The query it runs is this:
       
       result2 = session.query(DagRun).offset(1).limit(100).all()
       assert len(result2) == 3
   
   The number of DagRun in db is 4 and it offsets from 1 returning the rest.
   Should I run a separate query that returns all entries and assign it to 
total_entries?

##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -29,26 +35,471 @@ def setUpClass(cls) -> None:
 
     def setUp(self) -> None:
         self.client = self.app.test_client()  # type:ignore
+        self.now = timezone.utcnow()
+        clear_db_runs()
+
+    def tearDown(self) -> None:
+        clear_db_runs()
 
 
 class TestDeleteDagRun(TestDagRunEndpoint):
     @pytest.mark.skip(reason="Not implemented yet")
     def test_should_response_200(self):
-        response = 
self.client.delete("/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+        response = 
self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 204
 
 
 class TestGetDagRun(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.get("/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 1
+        response = 
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                'dag_id': 'TEST_DAG_ID',
+                'dag_run_id': 'TEST_DAG_RUN_ID',
+                'end_date': None,
+                'state': 'running',
+                'execution_date': self.now.isoformat(),
+                'external_trigger': True,
+                'start_date': self.now.isoformat(),
+                'conf': {},
+            }
+        )
+
+    def test_should_response_404(self):
+        response = self.client.get("api/v1/dags/invalid-id/dagRuns/invalid-id")
+        assert response.status_code == 404
+        self.assertEqual(
+            {
+                'detail': None,
+                'status': 404,
+                'title': 'DAGRun not found',
+                'type': 'about:blank'
+            },
+            response.json
+        )
 
 
 class TestGetDagRuns(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_should_return_all_with_tilde_as_dag_id(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/~/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_handle_limit_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(100)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 100
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?limit=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 1)
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID0',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=0)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 1
+            }
+        )
+
+    @provide_session
+    def test_handle_offset_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(4)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 4
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?offset=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 3)

Review comment:
       Looks like the offset=1 is what is making it to return 3 instead of 4. I 
have tried different ways for the schema and total_entries definition but it 
always come as 3 instead of 4. I used pdb debugger and found out the offset is 
the cause. The query it runs is this:
       
       result2 = session.query(DagRun).offset(1).limit(100).all()
       assert len(result2) == 3
   
   The number of DagRun in db is 4 and it offsets from 1 returning the rest.
   Should I run a separate query that returns all entries and assign it to 
total_entries?

##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -29,26 +35,471 @@ def setUpClass(cls) -> None:
 
     def setUp(self) -> None:
         self.client = self.app.test_client()  # type:ignore
+        self.now = timezone.utcnow()
+        clear_db_runs()
+
+    def tearDown(self) -> None:
+        clear_db_runs()
 
 
 class TestDeleteDagRun(TestDagRunEndpoint):
     @pytest.mark.skip(reason="Not implemented yet")
     def test_should_response_200(self):
-        response = 
self.client.delete("/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+        response = 
self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 204
 
 
 class TestGetDagRun(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.get("/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 1
+        response = 
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                'dag_id': 'TEST_DAG_ID',
+                'dag_run_id': 'TEST_DAG_RUN_ID',
+                'end_date': None,
+                'state': 'running',
+                'execution_date': self.now.isoformat(),
+                'external_trigger': True,
+                'start_date': self.now.isoformat(),
+                'conf': {},
+            }
+        )
+
+    def test_should_response_404(self):
+        response = self.client.get("api/v1/dags/invalid-id/dagRuns/invalid-id")
+        assert response.status_code == 404
+        self.assertEqual(
+            {
+                'detail': None,
+                'status': 404,
+                'title': 'DAGRun not found',
+                'type': 'about:blank'
+            },
+            response.json
+        )
 
 
 class TestGetDagRuns(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_should_return_all_with_tilde_as_dag_id(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/~/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_handle_limit_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(100)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 100
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?limit=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 1)
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID0',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=0)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 1
+            }
+        )
+
+    @provide_session
+    def test_handle_offset_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(4)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 4
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?offset=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 3)

Review comment:
       Look what I found when I ran webserver
   **Without Filter Top**
   
![withoutfiltertop](https://user-images.githubusercontent.com/4122866/84039791-af0a4e00-a999-11ea-9e14-dc183ae48d33.png)
   **With Filter Top**
   
![withfiltertop](https://user-images.githubusercontent.com/4122866/84039923-db25cf00-a999-11ea-922b-87f76770338a.png)
   **Without Filter Bottom**
   
![withoutfilterbottom](https://user-images.githubusercontent.com/4122866/84039990-f1338f80-a999-11ea-9fb1-5d4fdddc3a77.png)
   **With Filter Bottom**
   
![withfilterbottom](https://user-images.githubusercontent.com/4122866/84040041-06102300-a99a-11ea-9df0-8b3cc861df71.png)
   
   

##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -29,26 +35,471 @@ def setUpClass(cls) -> None:
 
     def setUp(self) -> None:
         self.client = self.app.test_client()  # type:ignore
+        self.now = timezone.utcnow()
+        clear_db_runs()
+
+    def tearDown(self) -> None:
+        clear_db_runs()
 
 
 class TestDeleteDagRun(TestDagRunEndpoint):
     @pytest.mark.skip(reason="Not implemented yet")
     def test_should_response_200(self):
-        response = 
self.client.delete("/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+        response = 
self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 204
 
 
 class TestGetDagRun(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.get("/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 1
+        response = 
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                'dag_id': 'TEST_DAG_ID',
+                'dag_run_id': 'TEST_DAG_RUN_ID',
+                'end_date': None,
+                'state': 'running',
+                'execution_date': self.now.isoformat(),
+                'external_trigger': True,
+                'start_date': self.now.isoformat(),
+                'conf': {},
+            }
+        )
+
+    def test_should_response_404(self):
+        response = self.client.get("api/v1/dags/invalid-id/dagRuns/invalid-id")
+        assert response.status_code == 404
+        self.assertEqual(
+            {
+                'detail': None,
+                'status': 404,
+                'title': 'DAGRun not found',
+                'type': 'about:blank'
+            },
+            response.json
+        )
 
 
 class TestGetDagRuns(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_should_return_all_with_tilde_as_dag_id(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/~/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_handle_limit_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(100)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 100
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?limit=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 1)
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID0',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=0)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 1
+            }
+        )
+
+    @provide_session
+    def test_handle_offset_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(4)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 4
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?offset=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 3)

Review comment:
       I am thinking that we have been handling the total_entries correctly

##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -29,26 +35,471 @@ def setUpClass(cls) -> None:
 
     def setUp(self) -> None:
         self.client = self.app.test_client()  # type:ignore
+        self.now = timezone.utcnow()
+        clear_db_runs()
+
+    def tearDown(self) -> None:
+        clear_db_runs()
 
 
 class TestDeleteDagRun(TestDagRunEndpoint):
     @pytest.mark.skip(reason="Not implemented yet")
     def test_should_response_200(self):
-        response = 
self.client.delete("/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+        response = 
self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 204
 
 
 class TestGetDagRun(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.get("/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 1
+        response = 
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                'dag_id': 'TEST_DAG_ID',
+                'dag_run_id': 'TEST_DAG_RUN_ID',
+                'end_date': None,
+                'state': 'running',
+                'execution_date': self.now.isoformat(),
+                'external_trigger': True,
+                'start_date': self.now.isoformat(),
+                'conf': {},
+            }
+        )
+
+    def test_should_response_404(self):
+        response = self.client.get("api/v1/dags/invalid-id/dagRuns/invalid-id")
+        assert response.status_code == 404
+        self.assertEqual(
+            {
+                'detail': None,
+                'status': 404,
+                'title': 'DAGRun not found',
+                'type': 'about:blank'
+            },
+            response.json
+        )
 
 
 class TestGetDagRuns(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_should_return_all_with_tilde_as_dag_id(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/~/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_handle_limit_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(100)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 100
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?limit=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 1)
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID0',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=0)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 1
+            }
+        )
+
+    @provide_session
+    def test_handle_offset_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(4)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 4
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?offset=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 3)

Review comment:
       Look what I found when I ran webserver
   
   **Without Filter Top**
   
![withoutfiltertop](https://user-images.githubusercontent.com/4122866/84039791-af0a4e00-a999-11ea-9e14-dc183ae48d33.png)
   
   **With Filter Top**
   
![withfiltertop](https://user-images.githubusercontent.com/4122866/84039923-db25cf00-a999-11ea-922b-87f76770338a.png)
   
   **Without Filter Bottom**
   
![withoutfilterbottom](https://user-images.githubusercontent.com/4122866/84039990-f1338f80-a999-11ea-9fb1-5d4fdddc3a77.png)
   
   **With Filter Bottom**
   
![withfilterbottom](https://user-images.githubusercontent.com/4122866/84040041-06102300-a99a-11ea-9df0-8b3cc861df71.png)
   
   

##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -29,26 +35,471 @@ def setUpClass(cls) -> None:
 
     def setUp(self) -> None:
         self.client = self.app.test_client()  # type:ignore
+        self.now = timezone.utcnow()
+        clear_db_runs()
+
+    def tearDown(self) -> None:
+        clear_db_runs()
 
 
 class TestDeleteDagRun(TestDagRunEndpoint):
     @pytest.mark.skip(reason="Not implemented yet")
     def test_should_response_200(self):
-        response = 
self.client.delete("/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+        response = 
self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 204
 
 
 class TestGetDagRun(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.get("/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 1
+        response = 
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                'dag_id': 'TEST_DAG_ID',
+                'dag_run_id': 'TEST_DAG_RUN_ID',
+                'end_date': None,
+                'state': 'running',
+                'execution_date': self.now.isoformat(),
+                'external_trigger': True,
+                'start_date': self.now.isoformat(),
+                'conf': {},
+            }
+        )
+
+    def test_should_response_404(self):
+        response = self.client.get("api/v1/dags/invalid-id/dagRuns/invalid-id")
+        assert response.status_code == 404
+        self.assertEqual(
+            {
+                'detail': None,
+                'status': 404,
+                'title': 'DAGRun not found',
+                'type': 'about:blank'
+            },
+            response.json
+        )
 
 
 class TestGetDagRuns(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_should_return_all_with_tilde_as_dag_id(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/~/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_handle_limit_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(100)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 100
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?limit=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 1)
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID0',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=0)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 1
+            }
+        )
+
+    @provide_session
+    def test_handle_offset_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(4)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 4
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?offset=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 3)

Review comment:
       Look what I found when I ran webserver
   
   **Without Filter. Total entries is 56**
   
![withoutfiltertop](https://user-images.githubusercontent.com/4122866/84039791-af0a4e00-a999-11ea-9e14-dc183ae48d33.png)
   
   **With Filter. Total entries is 20**
   
![withfiltertop](https://user-images.githubusercontent.com/4122866/84039923-db25cf00-a999-11ea-922b-87f76770338a.png)
   
   **Without Filter Bottom. Total entries is 56**
   
![withoutfilterbottom](https://user-images.githubusercontent.com/4122866/84039990-f1338f80-a999-11ea-9fb1-5d4fdddc3a77.png)
   
   **With Filter Bottom. Total entries is 20**
   
![withfilterbottom](https://user-images.githubusercontent.com/4122866/84040041-06102300-a99a-11ea-9df0-8b3cc861df71.png)
   
   

##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -29,26 +35,471 @@ def setUpClass(cls) -> None:
 
     def setUp(self) -> None:
         self.client = self.app.test_client()  # type:ignore
+        self.now = timezone.utcnow()
+        clear_db_runs()
+
+    def tearDown(self) -> None:
+        clear_db_runs()
 
 
 class TestDeleteDagRun(TestDagRunEndpoint):
     @pytest.mark.skip(reason="Not implemented yet")
     def test_should_response_200(self):
-        response = 
self.client.delete("/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+        response = 
self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 204
 
 
 class TestGetDagRun(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.get("/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 1
+        response = 
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                'dag_id': 'TEST_DAG_ID',
+                'dag_run_id': 'TEST_DAG_RUN_ID',
+                'end_date': None,
+                'state': 'running',
+                'execution_date': self.now.isoformat(),
+                'external_trigger': True,
+                'start_date': self.now.isoformat(),
+                'conf': {},
+            }
+        )
+
+    def test_should_response_404(self):
+        response = self.client.get("api/v1/dags/invalid-id/dagRuns/invalid-id")
+        assert response.status_code == 404
+        self.assertEqual(
+            {
+                'detail': None,
+                'status': 404,
+                'title': 'DAGRun not found',
+                'type': 'about:blank'
+            },
+            response.json
+        )
 
 
 class TestGetDagRuns(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_should_return_all_with_tilde_as_dag_id(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/~/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_handle_limit_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(100)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 100
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?limit=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 1)
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID0',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=0)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 1
+            }
+        )
+
+    @provide_session
+    def test_handle_offset_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(4)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 4
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?offset=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 3)

Review comment:
       I will really appreciate if you could take a look at this once more

##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -29,26 +35,471 @@ def setUpClass(cls) -> None:
 
     def setUp(self) -> None:
         self.client = self.app.test_client()  # type:ignore
+        self.now = timezone.utcnow()
+        clear_db_runs()
+
+    def tearDown(self) -> None:
+        clear_db_runs()
 
 
 class TestDeleteDagRun(TestDagRunEndpoint):
     @pytest.mark.skip(reason="Not implemented yet")
     def test_should_response_200(self):
-        response = 
self.client.delete("/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+        response = 
self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 204
 
 
 class TestGetDagRun(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.get("/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 1
+        response = 
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID")
         assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                'dag_id': 'TEST_DAG_ID',
+                'dag_run_id': 'TEST_DAG_RUN_ID',
+                'end_date': None,
+                'state': 'running',
+                'execution_date': self.now.isoformat(),
+                'external_trigger': True,
+                'start_date': self.now.isoformat(),
+                'conf': {},
+            }
+        )
+
+    def test_should_response_404(self):
+        response = self.client.get("api/v1/dags/invalid-id/dagRuns/invalid-id")
+        assert response.status_code == 404
+        self.assertEqual(
+            {
+                'detail': None,
+                'status': 404,
+                'title': 'DAGRun not found',
+                'type': 'about:blank'
+            },
+            response.json
+        )
 
 
 class TestGetDagRuns(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
+
+    @provide_session
+    def test_should_response_200(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_should_return_all_with_tilde_as_dag_id(self, session):
+        dagrun_model_1 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_1',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now,
+            start_date=self.now,
+            external_trigger=True,
+        )
+        dagrun_model_2 = DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID_2',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=1),
+            start_date=self.now,
+            external_trigger=True,
+        )
+        session.add_all([dagrun_model_1, dagrun_model_2])
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 2
+        response = self.client.get("api/v1/dags/~/dagRuns")
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.now.isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=1)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 2
+            }
+        )
+
+    @provide_session
+    def test_handle_limit_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(100)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 100
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?limit=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 1)
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID0',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': (self.now + 
timedelta(minutes=0)).isoformat(),
+                        'external_trigger': True,
+                        'start_date': self.now.isoformat(),
+                        'conf': {},
+                    }
+                ],
+                "total_entries": 1
+            }
+        )
+
+    @provide_session
+    def test_handle_offset_in_query(self, session):
+        dagrun_models = [DagRun(
+            dag_id='TEST_DAG_ID',
+            run_id='TEST_DAG_RUN_ID' + str(i),
+            run_type=DagRunType.MANUAL.value,
+            execution_date=self.now + timedelta(minutes=i),
+            start_date=self.now,
+            external_trigger=True,
+        ) for i in range(4)]
+
+        session.add_all(dagrun_models)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 4
+
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns?offset=1"
+        )
+        assert response.status_code == 200
+        self.assertEqual(response.json.get('total_entries'), 3)

Review comment:
       Fixed 
[5096389](https://github.com/apache/airflow/pull/9153/commits/5096389c1447d8100e70fdab7b593bfd4112aaf9)

##########
File path: tests/api_connexion/schemas/test_dag_run_schema.py
##########
@@ -0,0 +1,171 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+from airflow.api_connexion.schemas.dag_run_schema import (
+    DAGRunCollection, dagrun_collection_schema, dagrun_schema,
+)
+from airflow.models import DagRun
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.types import DagRunType
+from tests.test_utils.db import clear_db_runs
+
+
+class TestDAGRunBase(unittest.TestCase):
+
+    def setUp(self) -> None:
+        clear_db_runs()
+        self.now = timezone.utcnow()
+
+    def tearDown(self) -> None:
+        clear_db_runs()
+
+
+class TestDAGRunSchema(TestDAGRunBase):
+
+    @provide_session
+    def test_serialzie(self, session):
+        dagrun_model = DagRun(run_id='my-dag-run',
+                              run_type=DagRunType.MANUAL.value,
+                              execution_date=self.now,
+                              start_date=self.now,
+                              conf='{"start": "stop"}'
+                              )
+        session.add(dagrun_model)
+        session.commit()
+        dagrun_model = session.query(DagRun).first()
+        deserialized_dagrun = dagrun_schema.dump(dagrun_model)
+
+        self.assertEqual(
+            deserialized_dagrun[0],
+            {
+                'dag_id': None,
+                'dag_run_id': 'my-dag-run',
+                'end_date': None,
+                'state': 'running',
+                'execution_date': str(self.now.isoformat()),
+                'external_trigger': True,
+                'start_date': str(self.now.isoformat()),
+                'conf': '{"start": "stop"}'

Review comment:
       I found that this can solve it for us without any validation code from 
our side:
   
   `app.add_api("openapi.yaml", strict_validation=True)`
   
   https://connexion.readthedocs.io/en/latest/request.html#parameter-validation
   
   Should I add it?

##########
File path: tests/api_connexion/schemas/test_dag_run_schema.py
##########
@@ -0,0 +1,171 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+from airflow.api_connexion.schemas.dag_run_schema import (
+    DAGRunCollection, dagrun_collection_schema, dagrun_schema,
+)
+from airflow.models import DagRun
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.types import DagRunType
+from tests.test_utils.db import clear_db_runs
+
+
+class TestDAGRunBase(unittest.TestCase):
+
+    def setUp(self) -> None:
+        clear_db_runs()
+        self.now = timezone.utcnow()
+
+    def tearDown(self) -> None:
+        clear_db_runs()
+
+
+class TestDAGRunSchema(TestDAGRunBase):
+
+    @provide_session
+    def test_serialzie(self, session):
+        dagrun_model = DagRun(run_id='my-dag-run',
+                              run_type=DagRunType.MANUAL.value,
+                              execution_date=self.now,
+                              start_date=self.now,
+                              conf='{"start": "stop"}'
+                              )
+        session.add(dagrun_model)
+        session.commit()
+        dagrun_model = session.query(DagRun).first()
+        deserialized_dagrun = dagrun_schema.dump(dagrun_model)
+
+        self.assertEqual(
+            deserialized_dagrun[0],
+            {
+                'dag_id': None,
+                'dag_run_id': 'my-dag-run',
+                'end_date': None,
+                'state': 'running',
+                'execution_date': str(self.now.isoformat()),
+                'external_trigger': True,
+                'start_date': str(self.now.isoformat()),
+                'conf': '{"start": "stop"}'

Review comment:
       I found that this can solve it for us without any validation code from 
our side:
   
   `app.add_api("openapi.yaml", strict_validation=True)`
   
   https://connexion.readthedocs.io/en/latest/request.html#parameter-validation
   
   Should I add it?

##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -26,18 +34,100 @@ def delete_dag_run():
     raise NotImplementedError("Not implemented yet.")
 
 
-def get_dag_run():
+@provide_session
+def get_dag_run(dag_id, dag_run_id, session):
     """
     Get a DAG Run.
     """
-    raise NotImplementedError("Not implemented yet.")
+    query = session.query(DagRun)
+    query = query.filter(DagRun.dag_id == dag_id)
+    query = query.filter(DagRun.run_id == dag_run_id)
+    dag_run = query.one_or_none()
+    if dag_run is None:
+        raise NotFound("DAGRun not found")
+    return dagrun_schema.dump(dag_run)
 
 
-def get_dag_runs():
+@provide_session
+def get_dag_runs(dag_id, session):
     """
     Get all DAG Runs.
     """
-    raise NotImplementedError("Not implemented yet.")
+    offset = request.args.get(parameters.page_offset, 0)
+    limit = min(int(request.args.get(parameters.page_limit, 100)), 100)
+
+    start_date_gte = parse_datetime_in_query(

Review comment:
       I found that this can solve it for us without any validation code from 
our side:
   
   app.add_api("openapi.yaml", strict_validation=True)
   
   https://connexion.readthedocs.io/en/latest/request.html#parameter-validation
   
   Should I add it?

##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -26,18 +34,100 @@ def delete_dag_run():
     raise NotImplementedError("Not implemented yet.")
 
 
-def get_dag_run():
+@provide_session
+def get_dag_run(dag_id, dag_run_id, session):
     """
     Get a DAG Run.
     """
-    raise NotImplementedError("Not implemented yet.")
+    query = session.query(DagRun)
+    query = query.filter(DagRun.dag_id == dag_id)
+    query = query.filter(DagRun.run_id == dag_run_id)
+    dag_run = query.one_or_none()
+    if dag_run is None:
+        raise NotFound("DAGRun not found")
+    return dagrun_schema.dump(dag_run)
 
 
-def get_dag_runs():
+@provide_session
+def get_dag_runs(dag_id, session):
     """
     Get all DAG Runs.
     """
-    raise NotImplementedError("Not implemented yet.")
+    offset = request.args.get(parameters.page_offset, 0)
+    limit = min(int(request.args.get(parameters.page_limit, 100)), 100)
+
+    start_date_gte = parse_datetime_in_query(

Review comment:
       I found that this can solve it for us without any validation code from 
our side:
   
   app.add_api("openapi.yaml", strict_validation=True)
   
   https://connexion.readthedocs.io/en/latest/request.html#parameter-validation




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to