Repository: incubator-airflow Updated Branches: refs/heads/master 8c4131ba3 -> 5676ec79d
[AIRFLOW-2608] Implements/Standardize custom exceptions for experimental APIs Implements/Standardize custom exceptions for experimental APIs Implements/Standardize custom exceptions for experimental APIs Closes #3496 from verdan/AIRFLOW-2608-api- exceptions-handling Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5676ec79 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5676ec79 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5676ec79 Branch: refs/heads/master Commit: 5676ec79d5f55f1c33383a7aa88cb3aed2a8d07e Parents: 8c4131b Author: Verdan Mahmood <[email protected]> Authored: Tue Jun 19 10:27:14 2018 +0200 Committer: Fokko Driesprong <[email protected]> Committed: Tue Jun 19 10:27:14 2018 +0200 ---------------------------------------------------------------------- airflow/api/common/experimental/delete_dag.py | 12 ++--- .../common/experimental/get_dag_run_state.py | 6 +-- airflow/api/common/experimental/get_task.py | 6 +-- .../common/experimental/get_task_instance.py | 11 +++-- airflow/api/common/experimental/pool.py | 18 ++----- airflow/api/common/experimental/trigger_dag.py | 6 +-- airflow/exceptions.py | 51 +++++++++++++++++++- airflow/www/api/experimental/endpoints.py | 46 +++++++++--------- airflow/www_rbac/api/experimental/endpoints.py | 40 +++++++-------- tests/api/common/experimental/test_pool.py | 17 ++++--- 10 files changed, 125 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5676ec79/airflow/api/common/experimental/delete_dag.py ---------------------------------------------------------------------- diff --git a/airflow/api/common/experimental/delete_dag.py b/airflow/api/common/experimental/delete_dag.py index a37ac96..b9ce736 100644 --- a/airflow/api/common/experimental/delete_dag.py +++ b/airflow/api/common/experimental/delete_dag.py @@ -17,17 +17,10 @@ # specific language governing permissions and limitations # under the License. -from airflow import models, settings -from airflow.exceptions import AirflowException from sqlalchemy import or_ - -class DagFileExists(AirflowException): - status = 400 - - -class DagNotFound(AirflowException): - status = 404 +from airflow import models, settings +from airflow.exceptions import DagNotFound, DagFileExists def delete_dag(dag_id): @@ -45,6 +38,7 @@ def delete_dag(dag_id): count = 0 + # noinspection PyUnresolvedReferences,PyProtectedMember for m in models.Base._decl_class_registry.values(): if hasattr(m, "dag_id"): cond = or_(m.dag_id == dag_id, m.dag_id.like(dag_id + ".%")) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5676ec79/airflow/api/common/experimental/get_dag_run_state.py ---------------------------------------------------------------------- diff --git a/airflow/api/common/experimental/get_dag_run_state.py b/airflow/api/common/experimental/get_dag_run_state.py index a7bd131..a37aff6 100644 --- a/airflow/api/common/experimental/get_dag_run_state.py +++ b/airflow/api/common/experimental/get_dag_run_state.py @@ -17,7 +17,7 @@ # specific language governing permissions and limitations # under the License. -from airflow.exceptions import AirflowException +from airflow.exceptions import DagNotFound, DagRunNotFound from airflow.models import DagBag @@ -29,7 +29,7 @@ def get_dag_run_state(dag_id, execution_date): # Check DAG exists. if dag_id not in dagbag.dags: error_message = "Dag id {} not found".format(dag_id) - raise AirflowException(error_message) + raise DagNotFound(error_message) # Get DAG object and check Task Exists dag = dagbag.get_dag(dag_id) @@ -39,6 +39,6 @@ def get_dag_run_state(dag_id, execution_date): if not dagrun: error_message = ('Dag Run for date {} not found in dag {}' .format(execution_date, dag_id)) - raise AirflowException(error_message) + raise DagRunNotFound(error_message) return {'state': dagrun.get_state()} http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5676ec79/airflow/api/common/experimental/get_task.py ---------------------------------------------------------------------- diff --git a/airflow/api/common/experimental/get_task.py b/airflow/api/common/experimental/get_task.py index 472441d..96aa35a 100644 --- a/airflow/api/common/experimental/get_task.py +++ b/airflow/api/common/experimental/get_task.py @@ -17,7 +17,7 @@ # specific language governing permissions and limitations # under the License. -from airflow.exceptions import AirflowException +from airflow.exceptions import DagNotFound, TaskNotFound from airflow.models import DagBag @@ -28,13 +28,13 @@ def get_task(dag_id, task_id): # Check DAG exists. if dag_id not in dagbag.dags: error_message = "Dag id {} not found".format(dag_id) - raise AirflowException(error_message) + raise DagNotFound(error_message) # Get DAG object and check Task Exists dag = dagbag.get_dag(dag_id) if not dag.has_task(task_id): error_message = 'Task {} not found in dag {}'.format(task_id, dag_id) - raise AirflowException(error_message) + raise TaskNotFound(error_message) # Return the task. return dag.get_task(task_id) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5676ec79/airflow/api/common/experimental/get_task_instance.py ---------------------------------------------------------------------- diff --git a/airflow/api/common/experimental/get_task_instance.py b/airflow/api/common/experimental/get_task_instance.py index cada270..c9f59cd 100644 --- a/airflow/api/common/experimental/get_task_instance.py +++ b/airflow/api/common/experimental/get_task_instance.py @@ -17,7 +17,8 @@ # specific language governing permissions and limitations # under the License. -from airflow.exceptions import AirflowException +from airflow.exceptions import (DagNotFound, TaskNotFound, + DagRunNotFound, TaskInstanceNotFound) from airflow.models import DagBag @@ -29,26 +30,26 @@ def get_task_instance(dag_id, task_id, execution_date): # Check DAG exists. if dag_id not in dagbag.dags: error_message = "Dag id {} not found".format(dag_id) - raise AirflowException(error_message) + raise DagNotFound(error_message) # Get DAG object and check Task Exists dag = dagbag.get_dag(dag_id) if not dag.has_task(task_id): error_message = 'Task {} not found in dag {}'.format(task_id, dag_id) - raise AirflowException(error_message) + raise TaskNotFound(error_message) # Get DagRun object and check that it exists dagrun = dag.get_dagrun(execution_date=execution_date) if not dagrun: error_message = ('Dag Run for date {} not found in dag {}' .format(execution_date, dag_id)) - raise AirflowException(error_message) + raise DagRunNotFound(error_message) # Get task instance object and check that it exists task_instance = dagrun.get_task_instance(task_id) if not task_instance: error_message = ('Task {} instance for date {} not found' .format(task_id, execution_date)) - raise AirflowException(error_message) + raise TaskInstanceNotFound(error_message) return task_instance http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5676ec79/airflow/api/common/experimental/pool.py ---------------------------------------------------------------------- diff --git a/airflow/api/common/experimental/pool.py b/airflow/api/common/experimental/pool.py index 57a6d97..f125036 100644 --- a/airflow/api/common/experimental/pool.py +++ b/airflow/api/common/experimental/pool.py @@ -17,24 +17,16 @@ # specific language governing permissions and limitations # under the License. -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowBadRequest, PoolNotFound from airflow.models import Pool from airflow.utils.db import provide_session -class PoolBadRequest(AirflowException): - status = 400 - - -class PoolNotFound(AirflowException): - status = 404 - - @provide_session def get_pool(name, session=None): """Get pool by a given name.""" if not (name and name.strip()): - raise PoolBadRequest("Pool name shouldn't be empty") + raise AirflowBadRequest("Pool name shouldn't be empty") pool = session.query(Pool).filter_by(pool=name).first() if pool is None: @@ -53,12 +45,12 @@ def get_pools(session=None): def create_pool(name, slots, description, session=None): """Create a pool with a given parameters.""" if not (name and name.strip()): - raise PoolBadRequest("Pool name shouldn't be empty") + raise AirflowBadRequest("Pool name shouldn't be empty") try: slots = int(slots) except ValueError: - raise PoolBadRequest("Bad value for `slots`: %s" % slots) + raise AirflowBadRequest("Bad value for `slots`: %s" % slots) session.expire_on_commit = False pool = session.query(Pool).filter_by(pool=name).first() @@ -78,7 +70,7 @@ def create_pool(name, slots, description, session=None): def delete_pool(name, session=None): """Delete pool by a given name.""" if not (name and name.strip()): - raise PoolBadRequest("Pool name shouldn't be empty") + raise AirflowBadRequest("Pool name shouldn't be empty") pool = session.query(Pool).filter_by(pool=name).first() if pool is None: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5676ec79/airflow/api/common/experimental/trigger_dag.py ---------------------------------------------------------------------- diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py index fd9b51f..86be6aa 100644 --- a/airflow/api/common/experimental/trigger_dag.py +++ b/airflow/api/common/experimental/trigger_dag.py @@ -19,7 +19,7 @@ import json -from airflow.exceptions import AirflowException +from airflow.exceptions import DagRunAlreadyExists, DagNotFound from airflow.models import DagRun, DagBag from airflow.utils import timezone from airflow.utils.state import State @@ -35,7 +35,7 @@ def _trigger_dag( replace_microseconds, ): if dag_id not in dag_bag.dags: - raise AirflowException("Dag id {} not found".format(dag_id)) + raise DagNotFound("Dag id {} not found".format(dag_id)) dag = dag_bag.get_dag(dag_id) @@ -52,7 +52,7 @@ def _trigger_dag( dr = dag_run.find(dag_id=dag_id, run_id=run_id) if dr: - raise AirflowException("Run id {} already exists for dag id {}".format( + raise DagRunAlreadyExists("Run id {} already exists for dag id {}".format( run_id, dag_id )) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5676ec79/airflow/exceptions.py ---------------------------------------------------------------------- diff --git a/airflow/exceptions.py b/airflow/exceptions.py index 96e58b4..89f3d0e 100644 --- a/airflow/exceptions.py +++ b/airflow/exceptions.py @@ -22,7 +22,21 @@ class AirflowException(Exception): - pass + """ + Base class for all Airflow's errors. + Each custom exception should be derived from this class + """ + status_code = 500 + + +class AirflowBadRequest(AirflowException): + """Raise when the application or server cannot handle the request""" + status_code = 400 + + +class AirflowNotFoundException(AirflowException): + """Raise when the requested object/resource is not available in the system""" + status_code = 404 class AirflowConfigException(AirflowException): @@ -47,3 +61,38 @@ class AirflowSkipException(AirflowException): class AirflowDagCycleException(AirflowException): pass + + +class DagNotFound(AirflowNotFoundException): + """Raise when a DAG is not available in the system""" + pass + + +class DagRunNotFound(AirflowNotFoundException): + """Raise when a DAG Run is not available in the system""" + pass + + +class DagRunAlreadyExists(AirflowBadRequest): + """Raise when creating a DAG run for DAG which already has DAG run entry""" + pass + + +class DagFileExists(AirflowBadRequest): + """Raise when a DAG ID is still in DagBag i.e., DAG file is in DAG folder""" + pass + + +class TaskNotFound(AirflowNotFoundException): + """Raise when a Task is not available in the system""" + pass + + +class TaskInstanceNotFound(AirflowNotFoundException): + """Raise when a Task Instance is not available in the system""" + pass + + +class PoolNotFound(AirflowNotFoundException): + """Raise when a Pool is not available in the system""" + pass http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5676ec79/airflow/www/api/experimental/endpoints.py ---------------------------------------------------------------------- diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py index 1778000..f0bc319 100644 --- a/airflow/www/api/experimental/endpoints.py +++ b/airflow/www/api/experimental/endpoints.py @@ -79,7 +79,7 @@ def trigger_dag(dag_id): except AirflowException as err: _log.error(err) response = jsonify(error="{}".format(err)) - response.status_code = 404 + response.status_code = err.status_code return response if getattr(g, 'user', None): @@ -98,10 +98,10 @@ def delete_dag(dag_id): """ try: count = delete.delete_dag(dag_id) - except AirflowException as e: - _log.error(e) - response = jsonify(error="{}".format(e)) - response.status_code = getattr(e, 'status', 500) + except AirflowException as err: + _log.error(err) + response = jsonify(error="{}".format(err)) + response.status_code = err.status_code return response return jsonify(message="Removed {} record(s)".format(count), count=count) @@ -121,7 +121,7 @@ def task_info(dag_id, task_id): except AirflowException as err: _log.info(err) response = jsonify(error="{}".format(err)) - response.status_code = 404 + response.status_code = err.status_code return response # JSONify and return. @@ -162,7 +162,7 @@ def task_instance_info(dag_id, execution_date, task_id): except AirflowException as err: _log.info(err) response = jsonify(error="{}".format(err)) - response.status_code = 404 + response.status_code = err.status_code return response # JSONify and return. @@ -198,10 +198,10 @@ def get_pool(name): """Get pool by a given name.""" try: pool = pool_api.get_pool(name=name) - except AirflowException as e: - _log.error(e) - response = jsonify(error="{}".format(e)) - response.status_code = getattr(e, 'status', 500) + except AirflowException as err: + _log.error(err) + response = jsonify(error="{}".format(err)) + response.status_code = err.status_code return response else: return jsonify(pool.to_json()) @@ -213,10 +213,10 @@ def get_pools(): """Get all pools.""" try: pools = pool_api.get_pools() - except AirflowException as e: - _log.error(e) - response = jsonify(error="{}".format(e)) - response.status_code = getattr(e, 'status', 500) + except AirflowException as err: + _log.error(err) + response = jsonify(error="{}".format(err)) + response.status_code = err.status_code return response else: return jsonify([p.to_json() for p in pools]) @@ -230,10 +230,10 @@ def create_pool(): params = request.get_json(force=True) try: pool = pool_api.create_pool(**params) - except AirflowException as e: - _log.error(e) - response = jsonify(error="{}".format(e)) - response.status_code = getattr(e, 'status', 500) + except AirflowException as err: + _log.error(err) + response = jsonify(error="{}".format(err)) + response.status_code = err.status_code return response else: return jsonify(pool.to_json()) @@ -246,10 +246,10 @@ def delete_pool(name): """Delete pool.""" try: pool = pool_api.delete_pool(name=name) - except AirflowException as e: - _log.error(e) - response = jsonify(error="{}".format(e)) - response.status_code = getattr(e, 'status', 500) + except AirflowException as err: + _log.error(err) + response = jsonify(error="{}".format(err)) + response.status_code = err.status_code return response else: return jsonify(pool.to_json()) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5676ec79/airflow/www_rbac/api/experimental/endpoints.py ---------------------------------------------------------------------- diff --git a/airflow/www_rbac/api/experimental/endpoints.py b/airflow/www_rbac/api/experimental/endpoints.py index a594b38..8663bad 100644 --- a/airflow/www_rbac/api/experimental/endpoints.py +++ b/airflow/www_rbac/api/experimental/endpoints.py @@ -81,7 +81,7 @@ def trigger_dag(dag_id): except AirflowException as err: _log.error(err) response = jsonify(error="{}".format(err)) - response.status_code = 404 + response.status_code = err.status_code return response if getattr(g, 'user', None): @@ -128,7 +128,7 @@ def task_info(dag_id, task_id): except AirflowException as err: _log.info(err) response = jsonify(error="{}".format(err)) - response.status_code = 404 + response.status_code = err.status_code return response # JSONify and return. @@ -191,7 +191,7 @@ def task_instance_info(dag_id, execution_date, task_id): except AirflowException as err: _log.info(err) response = jsonify(error="{}".format(err)) - response.status_code = 404 + response.status_code = err.status_code return response # JSONify and return. @@ -232,7 +232,7 @@ def dag_run_status(dag_id, execution_date): except AirflowException as err: _log.info(err) response = jsonify(error="{}".format(err)) - response.status_code = 404 + response.status_code = err.status_code return response return jsonify(info) @@ -263,10 +263,10 @@ def get_pool(name): """Get pool by a given name.""" try: pool = pool_api.get_pool(name=name) - except AirflowException as e: - _log.error(e) - response = jsonify(error="{}".format(e)) - response.status_code = getattr(e, 'status', 500) + except AirflowException as err: + _log.error(err) + response = jsonify(error="{}".format(err)) + response.status_code = err.status_code return response else: return jsonify(pool.to_json()) @@ -278,10 +278,10 @@ def get_pools(): """Get all pools.""" try: pools = pool_api.get_pools() - except AirflowException as e: - _log.error(e) - response = jsonify(error="{}".format(e)) - response.status_code = getattr(e, 'status', 500) + except AirflowException as err: + _log.error(err) + response = jsonify(error="{}".format(err)) + response.status_code = err.status_code return response else: return jsonify([p.to_json() for p in pools]) @@ -295,10 +295,10 @@ def create_pool(): params = request.get_json(force=True) try: pool = pool_api.create_pool(**params) - except AirflowException as e: - _log.error(e) - response = jsonify(error="{}".format(e)) - response.status_code = getattr(e, 'status', 500) + except AirflowException as err: + _log.error(err) + response = jsonify(error="{}".format(err)) + response.status_code = err.status_code return response else: return jsonify(pool.to_json()) @@ -311,10 +311,10 @@ def delete_pool(name): """Delete pool.""" try: pool = pool_api.delete_pool(name=name) - except AirflowException as e: - _log.error(e) - response = jsonify(error="{}".format(e)) - response.status_code = getattr(e, 'status', 500) + except AirflowException as err: + _log.error(err) + response = jsonify(error="{}".format(err)) + response.status_code = err.status_code return response else: return jsonify(pool.to_json()) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5676ec79/tests/api/common/experimental/test_pool.py ---------------------------------------------------------------------- diff --git a/tests/api/common/experimental/test_pool.py b/tests/api/common/experimental/test_pool.py index 08b27c6..e5efa2c 100644 --- a/tests/api/common/experimental/test_pool.py +++ b/tests/api/common/experimental/test_pool.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,9 +19,10 @@ import unittest -from airflow.api.common.experimental import pool as pool_api from airflow import models from airflow import settings +from airflow.api.common.experimental import pool as pool_api +from airflow.exceptions import AirflowBadRequest, PoolNotFound class TestPool(unittest.TestCase): @@ -52,7 +53,7 @@ class TestPool(unittest.TestCase): self.assertEqual(pool.pool, self.pools[0].pool) def test_get_pool_non_existing(self): - self.assertRaisesRegexp(pool_api.PoolNotFound, + self.assertRaisesRegexp(PoolNotFound, "^Pool 'test' doesn't exist$", pool_api.get_pool, name='test', @@ -60,7 +61,7 @@ class TestPool(unittest.TestCase): def test_get_pool_bad_name(self): for name in ('', ' '): - self.assertRaisesRegexp(pool_api.PoolBadRequest, + self.assertRaisesRegexp(AirflowBadRequest, "^Pool name shouldn't be empty$", pool_api.get_pool, name=name, @@ -94,7 +95,7 @@ class TestPool(unittest.TestCase): def test_create_pool_bad_name(self): for name in ('', ' '): - self.assertRaisesRegexp(pool_api.PoolBadRequest, + self.assertRaisesRegexp(AirflowBadRequest, "^Pool name shouldn't be empty$", pool_api.create_pool, name=name, @@ -103,7 +104,7 @@ class TestPool(unittest.TestCase): session=self.session) def test_create_pool_bad_slots(self): - self.assertRaisesRegexp(pool_api.PoolBadRequest, + self.assertRaisesRegexp(AirflowBadRequest, "^Bad value for `slots`: foo$", pool_api.create_pool, name='foo', @@ -126,7 +127,7 @@ class TestPool(unittest.TestCase): def test_delete_pool_bad_name(self): for name in ('', ' '): - self.assertRaisesRegexp(pool_api.PoolBadRequest, + self.assertRaisesRegexp(AirflowBadRequest, "^Pool name shouldn't be empty$", pool_api.delete_pool, name=name,
