Repository: incubator-airflow
Updated Branches:
  refs/heads/master 0f4d681f6 -> bb0d9ee19


[AIRFLOW-2550] Implements API endpoint to list DAG runs

Closes #3499 from verdan/AIRFLOW-2550-list-dagruns


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/bb0d9ee1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/bb0d9ee1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/bb0d9ee1

Branch: refs/heads/master
Commit: bb0d9ee194b06575032b3fd589c326a254f4a9cb
Parents: 0f4d681
Author: Verdan Mahmood <[email protected]>
Authored: Fri Jun 15 11:20:03 2018 +0200
Committer: Fokko Driesprong <[email protected]>
Committed: Fri Jun 15 11:20:03 2018 +0200

----------------------------------------------------------------------
 airflow/api/common/experimental/get_dag_runs.py |  55 ++++++++
 airflow/www_rbac/api/experimental/endpoints.py  |  24 ++++
 .../api/experimental/test_dag_runs_endpoint.py  | 129 +++++++++++++++++++
 3 files changed, 208 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb0d9ee1/airflow/api/common/experimental/get_dag_runs.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/get_dag_runs.py 
b/airflow/api/common/experimental/get_dag_runs.py
new file mode 100644
index 0000000..63b1f99
--- /dev/null
+++ b/airflow/api/common/experimental/get_dag_runs.py
@@ -0,0 +1,55 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from flask import url_for
+
+from airflow.exceptions import AirflowException
+from airflow.models import DagBag, DagRun
+
+
+def get_dag_runs(dag_id, state=None):
+    """
+    Returns a list of Dag Runs for a specific DAG ID.
+    :param dag_id: String identifier of a DAG
+    :param state: queued|running|success...
+    :return: List of DAG runs of a DAG with requested state,
+    or all runs if the state is not specified
+    """
+    dagbag = DagBag()
+
+    # Check DAG exists.
+    if dag_id not in dagbag.dags:
+        error_message = "Dag id {} not found".format(dag_id)
+        raise AirflowException(error_message)
+
+    dag_runs = list()
+    state = state.lower() if state else None
+    for run in DagRun.find(dag_id=dag_id, state=state):
+        dag_runs.append({
+            'id': run.id,
+            'run_id': run.run_id,
+            'state': run.state,
+            'dag_id': run.dag_id,
+            'execution_date': run.execution_date.isoformat(),
+            'start_date': ((run.start_date or '') and
+                           run.start_date.isoformat()),
+            'dag_run_url': url_for('Airflow.graph', dag_id=run.dag_id,
+                                   execution_date=run.execution_date)
+        })
+
+    return dag_runs

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb0d9ee1/airflow/www_rbac/api/experimental/endpoints.py
----------------------------------------------------------------------
diff --git a/airflow/www_rbac/api/experimental/endpoints.py 
b/airflow/www_rbac/api/experimental/endpoints.py
index 693feec..a594b38 100644
--- a/airflow/www_rbac/api/experimental/endpoints.py
+++ b/airflow/www_rbac/api/experimental/endpoints.py
@@ -20,6 +20,7 @@ import airflow.api
 
 from airflow.api.common.experimental import pool as pool_api
 from airflow.api.common.experimental import trigger_dag as trigger
+from airflow.api.common.experimental.get_dag_runs import get_dag_runs
 from airflow.api.common.experimental.get_task import get_task
 from airflow.api.common.experimental.get_task_instance import get_task_instance
 from airflow.api.common.experimental.get_dag_run_state import get_dag_run_state
@@ -90,6 +91,28 @@ def trigger_dag(dag_id):
     return response
 
 
+@api_experimental.route('/dags/<string:dag_id>/dag_runs', methods=['GET'])
+@requires_authentication
+def dag_runs(dag_id):
+    """
+    Returns a list of Dag Runs for a specific DAG ID.
+    :query param state: a query string parameter 
'?state=queued|running|success...'
+    :param dag_id: String identifier of a DAG
+    :return: List of DAG runs of a DAG with requested state,
+    or all runs if the state is not specified
+    """
+    try:
+        state = request.args.get('state')
+        dagruns = get_dag_runs(dag_id, state)
+    except AirflowException as err:
+        _log.info(err)
+        response = jsonify(error="{}".format(err))
+        response.status_code = 400
+        return response
+
+    return jsonify(dagruns)
+
+
 @api_experimental.route('/test', methods=['GET'])
 @requires_authentication
 def test():
@@ -115,6 +138,7 @@ def task_info(dag_id, task_id):
     return jsonify(fields)
 
 
+# ToDo: Shouldn't this be a PUT method?
 @api_experimental.route('/dags/<string:dag_id>/paused/<string:paused>', 
methods=['GET'])
 @requires_authentication
 def dag_paused(dag_id, paused):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb0d9ee1/tests/www_rbac/api/experimental/test_dag_runs_endpoint.py
----------------------------------------------------------------------
diff --git a/tests/www_rbac/api/experimental/test_dag_runs_endpoint.py 
b/tests/www_rbac/api/experimental/test_dag_runs_endpoint.py
new file mode 100644
index 0000000..88f694d
--- /dev/null
+++ b/tests/www_rbac/api/experimental/test_dag_runs_endpoint.py
@@ -0,0 +1,129 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+import unittest
+
+from airflow import configuration
+from airflow.api.common.experimental.trigger_dag import trigger_dag
+from airflow.models import DagRun
+from airflow.settings import Session
+from airflow.www_rbac import app as application
+
+
+class TestDagRunsEndpoint(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestDagRunsEndpoint, cls).setUpClass()
+        session = Session()
+        session.query(DagRun).delete()
+        session.commit()
+        session.close()
+
+    def setUp(self):
+        super(TestDagRunsEndpoint, self).setUp()
+        configuration.load_test_config()
+        app, _ = application.create_app(testing=True)
+        self.app = app.test_client()
+
+    def tearDown(self):
+        session = Session()
+        session.query(DagRun).delete()
+        session.commit()
+        session.close()
+        super(TestDagRunsEndpoint, self).tearDown()
+
+    def test_get_dag_runs_success(self):
+        url_template = '/api/experimental/dags/{}/dag_runs'
+        dag_id = 'example_bash_operator'
+        # Create DagRun
+        dag_run = trigger_dag(dag_id=dag_id, 
run_id='test_get_dag_runs_success')
+
+        response = self.app.get(url_template.format(dag_id))
+        self.assertEqual(200, response.status_code)
+        data = json.loads(response.data.decode('utf-8'))
+
+        self.assertIsInstance(data, list)
+        self.assertEqual(len(data), 1)
+        self.assertEqual(data[0]['dag_id'], dag_id)
+        self.assertEqual(data[0]['id'], dag_run.id)
+
+    def test_get_dag_runs_success_with_state_parameter(self):
+        url_template = '/api/experimental/dags/{}/dag_runs?state=running'
+        dag_id = 'example_bash_operator'
+        # Create DagRun
+        dag_run = trigger_dag(dag_id=dag_id, 
run_id='test_get_dag_runs_success')
+
+        response = self.app.get(url_template.format(dag_id))
+        self.assertEqual(200, response.status_code)
+        data = json.loads(response.data.decode('utf-8'))
+
+        self.assertIsInstance(data, list)
+        self.assertEqual(len(data), 1)
+        self.assertEqual(data[0]['dag_id'], dag_id)
+        self.assertEqual(data[0]['id'], dag_run.id)
+
+    def test_get_dag_runs_success_with_capital_state_parameter(self):
+        url_template = '/api/experimental/dags/{}/dag_runs?state=RUNNING'
+        dag_id = 'example_bash_operator'
+        # Create DagRun
+        dag_run = trigger_dag(dag_id=dag_id, 
run_id='test_get_dag_runs_success')
+
+        response = self.app.get(url_template.format(dag_id))
+        self.assertEqual(200, response.status_code)
+        data = json.loads(response.data.decode('utf-8'))
+
+        self.assertIsInstance(data, list)
+        self.assertEqual(len(data), 1)
+        self.assertEqual(data[0]['dag_id'], dag_id)
+        self.assertEqual(data[0]['id'], dag_run.id)
+
+    def test_get_dag_runs_success_with_state_no_result(self):
+        url_template = '/api/experimental/dags/{}/dag_runs?state=dummy'
+        dag_id = 'example_bash_operator'
+        # Create DagRun
+        trigger_dag(dag_id=dag_id, run_id='test_get_dag_runs_success')
+
+        response = self.app.get(url_template.format(dag_id))
+        self.assertEqual(200, response.status_code)
+        data = json.loads(response.data.decode('utf-8'))
+
+        self.assertIsInstance(data, list)
+        self.assertEqual(len(data), 0)
+
+    def test_get_dag_runs_invalid_dag_id(self):
+        url_template = '/api/experimental/dags/{}/dag_runs'
+        dag_id = 'DUMMY_DAG'
+
+        response = self.app.get(url_template.format(dag_id))
+        self.assertEqual(400, response.status_code)
+        data = json.loads(response.data.decode('utf-8'))
+
+        self.assertNotIsInstance(data, list)
+
+    def test_get_dag_runs_no_runs(self):
+        url_template = '/api/experimental/dags/{}/dag_runs'
+        dag_id = 'example_bash_operator'
+
+        response = self.app.get(url_template.format(dag_id))
+        self.assertEqual(200, response.status_code)
+        data = json.loads(response.data.decode('utf-8'))
+
+        self.assertIsInstance(data, list)
+        self.assertEqual(len(data), 0)

Reply via email to