Repository: incubator-airflow Updated Branches: refs/heads/master 0f4d681f6 -> bb0d9ee19
[AIRFLOW-2550] Implements API endpoint to list DAG runs Closes #3499 from verdan/AIRFLOW-2550-list-dagruns Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/bb0d9ee1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/bb0d9ee1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/bb0d9ee1 Branch: refs/heads/master Commit: bb0d9ee194b06575032b3fd589c326a254f4a9cb Parents: 0f4d681 Author: Verdan Mahmood <[email protected]> Authored: Fri Jun 15 11:20:03 2018 +0200 Committer: Fokko Driesprong <[email protected]> Committed: Fri Jun 15 11:20:03 2018 +0200 ---------------------------------------------------------------------- airflow/api/common/experimental/get_dag_runs.py | 55 ++++++++ airflow/www_rbac/api/experimental/endpoints.py | 24 ++++ .../api/experimental/test_dag_runs_endpoint.py | 129 +++++++++++++++++++ 3 files changed, 208 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb0d9ee1/airflow/api/common/experimental/get_dag_runs.py ---------------------------------------------------------------------- diff --git a/airflow/api/common/experimental/get_dag_runs.py b/airflow/api/common/experimental/get_dag_runs.py new file mode 100644 index 0000000..63b1f99 --- /dev/null +++ b/airflow/api/common/experimental/get_dag_runs.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from flask import url_for + +from airflow.exceptions import AirflowException +from airflow.models import DagBag, DagRun + + +def get_dag_runs(dag_id, state=None): + """ + Returns a list of Dag Runs for a specific DAG ID. + :param dag_id: String identifier of a DAG + :param state: queued|running|success... + :return: List of DAG runs of a DAG with requested state, + or all runs if the state is not specified + """ + dagbag = DagBag() + + # Check DAG exists. + if dag_id not in dagbag.dags: + error_message = "Dag id {} not found".format(dag_id) + raise AirflowException(error_message) + + dag_runs = list() + state = state.lower() if state else None + for run in DagRun.find(dag_id=dag_id, state=state): + dag_runs.append({ + 'id': run.id, + 'run_id': run.run_id, + 'state': run.state, + 'dag_id': run.dag_id, + 'execution_date': run.execution_date.isoformat(), + 'start_date': ((run.start_date or '') and + run.start_date.isoformat()), + 'dag_run_url': url_for('Airflow.graph', dag_id=run.dag_id, + execution_date=run.execution_date) + }) + + return dag_runs http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb0d9ee1/airflow/www_rbac/api/experimental/endpoints.py ---------------------------------------------------------------------- diff --git a/airflow/www_rbac/api/experimental/endpoints.py b/airflow/www_rbac/api/experimental/endpoints.py index 693feec..a594b38 100644 --- a/airflow/www_rbac/api/experimental/endpoints.py +++ b/airflow/www_rbac/api/experimental/endpoints.py @@ -20,6 +20,7 @@ import airflow.api from airflow.api.common.experimental import pool as pool_api from airflow.api.common.experimental import trigger_dag as trigger +from airflow.api.common.experimental.get_dag_runs import get_dag_runs from airflow.api.common.experimental.get_task import get_task from airflow.api.common.experimental.get_task_instance import get_task_instance from airflow.api.common.experimental.get_dag_run_state import get_dag_run_state @@ -90,6 +91,28 @@ def trigger_dag(dag_id): return response +@api_experimental.route('/dags/<string:dag_id>/dag_runs', methods=['GET']) +@requires_authentication +def dag_runs(dag_id): + """ + Returns a list of Dag Runs for a specific DAG ID. + :query param state: a query string parameter '?state=queued|running|success...' + :param dag_id: String identifier of a DAG + :return: List of DAG runs of a DAG with requested state, + or all runs if the state is not specified + """ + try: + state = request.args.get('state') + dagruns = get_dag_runs(dag_id, state) + except AirflowException as err: + _log.info(err) + response = jsonify(error="{}".format(err)) + response.status_code = 400 + return response + + return jsonify(dagruns) + + @api_experimental.route('/test', methods=['GET']) @requires_authentication def test(): @@ -115,6 +138,7 @@ def task_info(dag_id, task_id): return jsonify(fields) +# ToDo: Shouldn't this be a PUT method? @api_experimental.route('/dags/<string:dag_id>/paused/<string:paused>', methods=['GET']) @requires_authentication def dag_paused(dag_id, paused): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb0d9ee1/tests/www_rbac/api/experimental/test_dag_runs_endpoint.py ---------------------------------------------------------------------- diff --git a/tests/www_rbac/api/experimental/test_dag_runs_endpoint.py b/tests/www_rbac/api/experimental/test_dag_runs_endpoint.py new file mode 100644 index 0000000..88f694d --- /dev/null +++ b/tests/www_rbac/api/experimental/test_dag_runs_endpoint.py @@ -0,0 +1,129 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import json +import unittest + +from airflow import configuration +from airflow.api.common.experimental.trigger_dag import trigger_dag +from airflow.models import DagRun +from airflow.settings import Session +from airflow.www_rbac import app as application + + +class TestDagRunsEndpoint(unittest.TestCase): + + @classmethod + def setUpClass(cls): + super(TestDagRunsEndpoint, cls).setUpClass() + session = Session() + session.query(DagRun).delete() + session.commit() + session.close() + + def setUp(self): + super(TestDagRunsEndpoint, self).setUp() + configuration.load_test_config() + app, _ = application.create_app(testing=True) + self.app = app.test_client() + + def tearDown(self): + session = Session() + session.query(DagRun).delete() + session.commit() + session.close() + super(TestDagRunsEndpoint, self).tearDown() + + def test_get_dag_runs_success(self): + url_template = '/api/experimental/dags/{}/dag_runs' + dag_id = 'example_bash_operator' + # Create DagRun + dag_run = trigger_dag(dag_id=dag_id, run_id='test_get_dag_runs_success') + + response = self.app.get(url_template.format(dag_id)) + self.assertEqual(200, response.status_code) + data = json.loads(response.data.decode('utf-8')) + + self.assertIsInstance(data, list) + self.assertEqual(len(data), 1) + self.assertEqual(data[0]['dag_id'], dag_id) + self.assertEqual(data[0]['id'], dag_run.id) + + def test_get_dag_runs_success_with_state_parameter(self): + url_template = '/api/experimental/dags/{}/dag_runs?state=running' + dag_id = 'example_bash_operator' + # Create DagRun + dag_run = trigger_dag(dag_id=dag_id, run_id='test_get_dag_runs_success') + + response = self.app.get(url_template.format(dag_id)) + self.assertEqual(200, response.status_code) + data = json.loads(response.data.decode('utf-8')) + + self.assertIsInstance(data, list) + self.assertEqual(len(data), 1) + self.assertEqual(data[0]['dag_id'], dag_id) + self.assertEqual(data[0]['id'], dag_run.id) + + def test_get_dag_runs_success_with_capital_state_parameter(self): + url_template = '/api/experimental/dags/{}/dag_runs?state=RUNNING' + dag_id = 'example_bash_operator' + # Create DagRun + dag_run = trigger_dag(dag_id=dag_id, run_id='test_get_dag_runs_success') + + response = self.app.get(url_template.format(dag_id)) + self.assertEqual(200, response.status_code) + data = json.loads(response.data.decode('utf-8')) + + self.assertIsInstance(data, list) + self.assertEqual(len(data), 1) + self.assertEqual(data[0]['dag_id'], dag_id) + self.assertEqual(data[0]['id'], dag_run.id) + + def test_get_dag_runs_success_with_state_no_result(self): + url_template = '/api/experimental/dags/{}/dag_runs?state=dummy' + dag_id = 'example_bash_operator' + # Create DagRun + trigger_dag(dag_id=dag_id, run_id='test_get_dag_runs_success') + + response = self.app.get(url_template.format(dag_id)) + self.assertEqual(200, response.status_code) + data = json.loads(response.data.decode('utf-8')) + + self.assertIsInstance(data, list) + self.assertEqual(len(data), 0) + + def test_get_dag_runs_invalid_dag_id(self): + url_template = '/api/experimental/dags/{}/dag_runs' + dag_id = 'DUMMY_DAG' + + response = self.app.get(url_template.format(dag_id)) + self.assertEqual(400, response.status_code) + data = json.loads(response.data.decode('utf-8')) + + self.assertNotIsInstance(data, list) + + def test_get_dag_runs_no_runs(self): + url_template = '/api/experimental/dags/{}/dag_runs' + dag_id = 'example_bash_operator' + + response = self.app.get(url_template.format(dag_id)) + self.assertEqual(200, response.status_code) + data = json.loads(response.data.decode('utf-8')) + + self.assertIsInstance(data, list) + self.assertEqual(len(data), 0)
