Repository: incubator-airflow
Updated Branches:
  refs/heads/master 8c4131ba3 -> 5676ec79d


[AIRFLOW-2608] Implements/Standardize custom exceptions for experimental APIs

Implements/Standardize custom exceptions for
experimental APIs

Implements/Standardize custom exceptions for
experimental APIs

Closes #3496 from verdan/AIRFLOW-2608-api-
exceptions-handling


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5676ec79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5676ec79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5676ec79

Branch: refs/heads/master
Commit: 5676ec79d5f55f1c33383a7aa88cb3aed2a8d07e
Parents: 8c4131b
Author: Verdan Mahmood <[email protected]>
Authored: Tue Jun 19 10:27:14 2018 +0200
Committer: Fokko Driesprong <[email protected]>
Committed: Tue Jun 19 10:27:14 2018 +0200

----------------------------------------------------------------------
 airflow/api/common/experimental/delete_dag.py   | 12 ++---
 .../common/experimental/get_dag_run_state.py    |  6 +--
 airflow/api/common/experimental/get_task.py     |  6 +--
 .../common/experimental/get_task_instance.py    | 11 +++--
 airflow/api/common/experimental/pool.py         | 18 ++-----
 airflow/api/common/experimental/trigger_dag.py  |  6 +--
 airflow/exceptions.py                           | 51 +++++++++++++++++++-
 airflow/www/api/experimental/endpoints.py       | 46 +++++++++---------
 airflow/www_rbac/api/experimental/endpoints.py  | 40 +++++++--------
 tests/api/common/experimental/test_pool.py      | 17 ++++---
 10 files changed, 125 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5676ec79/airflow/api/common/experimental/delete_dag.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/delete_dag.py 
b/airflow/api/common/experimental/delete_dag.py
index a37ac96..b9ce736 100644
--- a/airflow/api/common/experimental/delete_dag.py
+++ b/airflow/api/common/experimental/delete_dag.py
@@ -17,17 +17,10 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from airflow import models, settings
-from airflow.exceptions import AirflowException
 from sqlalchemy import or_
 
-
-class DagFileExists(AirflowException):
-    status = 400
-
-
-class DagNotFound(AirflowException):
-    status = 404
+from airflow import models, settings
+from airflow.exceptions import DagNotFound, DagFileExists
 
 
 def delete_dag(dag_id):
@@ -45,6 +38,7 @@ def delete_dag(dag_id):
 
     count = 0
 
+    # noinspection PyUnresolvedReferences,PyProtectedMember
     for m in models.Base._decl_class_registry.values():
         if hasattr(m, "dag_id"):
             cond = or_(m.dag_id == dag_id, m.dag_id.like(dag_id + ".%"))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5676ec79/airflow/api/common/experimental/get_dag_run_state.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/get_dag_run_state.py 
b/airflow/api/common/experimental/get_dag_run_state.py
index a7bd131..a37aff6 100644
--- a/airflow/api/common/experimental/get_dag_run_state.py
+++ b/airflow/api/common/experimental/get_dag_run_state.py
@@ -17,7 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import DagNotFound, DagRunNotFound
 from airflow.models import DagBag
 
 
@@ -29,7 +29,7 @@ def get_dag_run_state(dag_id, execution_date):
     # Check DAG exists.
     if dag_id not in dagbag.dags:
         error_message = "Dag id {} not found".format(dag_id)
-        raise AirflowException(error_message)
+        raise DagNotFound(error_message)
 
     # Get DAG object and check Task Exists
     dag = dagbag.get_dag(dag_id)
@@ -39,6 +39,6 @@ def get_dag_run_state(dag_id, execution_date):
     if not dagrun:
         error_message = ('Dag Run for date {} not found in dag {}'
                          .format(execution_date, dag_id))
-        raise AirflowException(error_message)
+        raise DagRunNotFound(error_message)
 
     return {'state': dagrun.get_state()}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5676ec79/airflow/api/common/experimental/get_task.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/get_task.py 
b/airflow/api/common/experimental/get_task.py
index 472441d..96aa35a 100644
--- a/airflow/api/common/experimental/get_task.py
+++ b/airflow/api/common/experimental/get_task.py
@@ -17,7 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import DagNotFound, TaskNotFound
 from airflow.models import DagBag
 
 
@@ -28,13 +28,13 @@ def get_task(dag_id, task_id):
     # Check DAG exists.
     if dag_id not in dagbag.dags:
         error_message = "Dag id {} not found".format(dag_id)
-        raise AirflowException(error_message)
+        raise DagNotFound(error_message)
 
     # Get DAG object and check Task Exists
     dag = dagbag.get_dag(dag_id)
     if not dag.has_task(task_id):
         error_message = 'Task {} not found in dag {}'.format(task_id, dag_id)
-        raise AirflowException(error_message)
+        raise TaskNotFound(error_message)
 
     # Return the task.
     return dag.get_task(task_id)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5676ec79/airflow/api/common/experimental/get_task_instance.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/get_task_instance.py 
b/airflow/api/common/experimental/get_task_instance.py
index cada270..c9f59cd 100644
--- a/airflow/api/common/experimental/get_task_instance.py
+++ b/airflow/api/common/experimental/get_task_instance.py
@@ -17,7 +17,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import (DagNotFound, TaskNotFound,
+                                DagRunNotFound, TaskInstanceNotFound)
 from airflow.models import DagBag
 
 
@@ -29,26 +30,26 @@ def get_task_instance(dag_id, task_id, execution_date):
     # Check DAG exists.
     if dag_id not in dagbag.dags:
         error_message = "Dag id {} not found".format(dag_id)
-        raise AirflowException(error_message)
+        raise DagNotFound(error_message)
 
     # Get DAG object and check Task Exists
     dag = dagbag.get_dag(dag_id)
     if not dag.has_task(task_id):
         error_message = 'Task {} not found in dag {}'.format(task_id, dag_id)
-        raise AirflowException(error_message)
+        raise TaskNotFound(error_message)
 
     # Get DagRun object and check that it exists
     dagrun = dag.get_dagrun(execution_date=execution_date)
     if not dagrun:
         error_message = ('Dag Run for date {} not found in dag {}'
                          .format(execution_date, dag_id))
-        raise AirflowException(error_message)
+        raise DagRunNotFound(error_message)
 
     # Get task instance object and check that it exists
     task_instance = dagrun.get_task_instance(task_id)
     if not task_instance:
         error_message = ('Task {} instance for date {} not found'
                          .format(task_id, execution_date))
-        raise AirflowException(error_message)
+        raise TaskInstanceNotFound(error_message)
 
     return task_instance

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5676ec79/airflow/api/common/experimental/pool.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/pool.py 
b/airflow/api/common/experimental/pool.py
index 57a6d97..f125036 100644
--- a/airflow/api/common/experimental/pool.py
+++ b/airflow/api/common/experimental/pool.py
@@ -17,24 +17,16 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowBadRequest, PoolNotFound
 from airflow.models import Pool
 from airflow.utils.db import provide_session
 
 
-class PoolBadRequest(AirflowException):
-    status = 400
-
-
-class PoolNotFound(AirflowException):
-    status = 404
-
-
 @provide_session
 def get_pool(name, session=None):
     """Get pool by a given name."""
     if not (name and name.strip()):
-        raise PoolBadRequest("Pool name shouldn't be empty")
+        raise AirflowBadRequest("Pool name shouldn't be empty")
 
     pool = session.query(Pool).filter_by(pool=name).first()
     if pool is None:
@@ -53,12 +45,12 @@ def get_pools(session=None):
 def create_pool(name, slots, description, session=None):
     """Create a pool with a given parameters."""
     if not (name and name.strip()):
-        raise PoolBadRequest("Pool name shouldn't be empty")
+        raise AirflowBadRequest("Pool name shouldn't be empty")
 
     try:
         slots = int(slots)
     except ValueError:
-        raise PoolBadRequest("Bad value for `slots`: %s" % slots)
+        raise AirflowBadRequest("Bad value for `slots`: %s" % slots)
 
     session.expire_on_commit = False
     pool = session.query(Pool).filter_by(pool=name).first()
@@ -78,7 +70,7 @@ def create_pool(name, slots, description, session=None):
 def delete_pool(name, session=None):
     """Delete pool by a given name."""
     if not (name and name.strip()):
-        raise PoolBadRequest("Pool name shouldn't be empty")
+        raise AirflowBadRequest("Pool name shouldn't be empty")
 
     pool = session.query(Pool).filter_by(pool=name).first()
     if pool is None:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5676ec79/airflow/api/common/experimental/trigger_dag.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/trigger_dag.py 
b/airflow/api/common/experimental/trigger_dag.py
index fd9b51f..86be6aa 100644
--- a/airflow/api/common/experimental/trigger_dag.py
+++ b/airflow/api/common/experimental/trigger_dag.py
@@ -19,7 +19,7 @@
 
 import json
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import DagRunAlreadyExists, DagNotFound
 from airflow.models import DagRun, DagBag
 from airflow.utils import timezone
 from airflow.utils.state import State
@@ -35,7 +35,7 @@ def _trigger_dag(
         replace_microseconds,
 ):
     if dag_id not in dag_bag.dags:
-        raise AirflowException("Dag id {} not found".format(dag_id))
+        raise DagNotFound("Dag id {} not found".format(dag_id))
 
     dag = dag_bag.get_dag(dag_id)
 
@@ -52,7 +52,7 @@ def _trigger_dag(
 
     dr = dag_run.find(dag_id=dag_id, run_id=run_id)
     if dr:
-        raise AirflowException("Run id {} already exists for dag id {}".format(
+        raise DagRunAlreadyExists("Run id {} already exists for dag id 
{}".format(
             run_id,
             dag_id
         ))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5676ec79/airflow/exceptions.py
----------------------------------------------------------------------
diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index 96e58b4..89f3d0e 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -22,7 +22,21 @@
 
 
 class AirflowException(Exception):
-    pass
+    """
+    Base class for all Airflow's errors.
+    Each custom exception should be derived from this class
+    """
+    status_code = 500
+
+
+class AirflowBadRequest(AirflowException):
+    """Raise when the application or server cannot handle the request"""
+    status_code = 400
+
+
+class AirflowNotFoundException(AirflowException):
+    """Raise when the requested object/resource is not available in the 
system"""
+    status_code = 404
 
 
 class AirflowConfigException(AirflowException):
@@ -47,3 +61,38 @@ class AirflowSkipException(AirflowException):
 
 class AirflowDagCycleException(AirflowException):
     pass
+
+
+class DagNotFound(AirflowNotFoundException):
+    """Raise when a DAG is not available in the system"""
+    pass
+
+
+class DagRunNotFound(AirflowNotFoundException):
+    """Raise when a DAG Run is not available in the system"""
+    pass
+
+
+class DagRunAlreadyExists(AirflowBadRequest):
+    """Raise when creating a DAG run for DAG which already has DAG run entry"""
+    pass
+
+
+class DagFileExists(AirflowBadRequest):
+    """Raise when a DAG ID is still in DagBag i.e., DAG file is in DAG 
folder"""
+    pass
+
+
+class TaskNotFound(AirflowNotFoundException):
+    """Raise when a Task is not available in the system"""
+    pass
+
+
+class TaskInstanceNotFound(AirflowNotFoundException):
+    """Raise when a Task Instance is not available in the system"""
+    pass
+
+
+class PoolNotFound(AirflowNotFoundException):
+    """Raise when a Pool is not available in the system"""
+    pass

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5676ec79/airflow/www/api/experimental/endpoints.py
----------------------------------------------------------------------
diff --git a/airflow/www/api/experimental/endpoints.py 
b/airflow/www/api/experimental/endpoints.py
index 1778000..f0bc319 100644
--- a/airflow/www/api/experimental/endpoints.py
+++ b/airflow/www/api/experimental/endpoints.py
@@ -79,7 +79,7 @@ def trigger_dag(dag_id):
     except AirflowException as err:
         _log.error(err)
         response = jsonify(error="{}".format(err))
-        response.status_code = 404
+        response.status_code = err.status_code
         return response
 
     if getattr(g, 'user', None):
@@ -98,10 +98,10 @@ def delete_dag(dag_id):
     """
     try:
         count = delete.delete_dag(dag_id)
-    except AirflowException as e:
-        _log.error(e)
-        response = jsonify(error="{}".format(e))
-        response.status_code = getattr(e, 'status', 500)
+    except AirflowException as err:
+        _log.error(err)
+        response = jsonify(error="{}".format(err))
+        response.status_code = err.status_code
         return response
     return jsonify(message="Removed {} record(s)".format(count), count=count)
 
@@ -121,7 +121,7 @@ def task_info(dag_id, task_id):
     except AirflowException as err:
         _log.info(err)
         response = jsonify(error="{}".format(err))
-        response.status_code = 404
+        response.status_code = err.status_code
         return response
 
     # JSONify and return.
@@ -162,7 +162,7 @@ def task_instance_info(dag_id, execution_date, task_id):
     except AirflowException as err:
         _log.info(err)
         response = jsonify(error="{}".format(err))
-        response.status_code = 404
+        response.status_code = err.status_code
         return response
 
     # JSONify and return.
@@ -198,10 +198,10 @@ def get_pool(name):
     """Get pool by a given name."""
     try:
         pool = pool_api.get_pool(name=name)
-    except AirflowException as e:
-        _log.error(e)
-        response = jsonify(error="{}".format(e))
-        response.status_code = getattr(e, 'status', 500)
+    except AirflowException as err:
+        _log.error(err)
+        response = jsonify(error="{}".format(err))
+        response.status_code = err.status_code
         return response
     else:
         return jsonify(pool.to_json())
@@ -213,10 +213,10 @@ def get_pools():
     """Get all pools."""
     try:
         pools = pool_api.get_pools()
-    except AirflowException as e:
-        _log.error(e)
-        response = jsonify(error="{}".format(e))
-        response.status_code = getattr(e, 'status', 500)
+    except AirflowException as err:
+        _log.error(err)
+        response = jsonify(error="{}".format(err))
+        response.status_code = err.status_code
         return response
     else:
         return jsonify([p.to_json() for p in pools])
@@ -230,10 +230,10 @@ def create_pool():
     params = request.get_json(force=True)
     try:
         pool = pool_api.create_pool(**params)
-    except AirflowException as e:
-        _log.error(e)
-        response = jsonify(error="{}".format(e))
-        response.status_code = getattr(e, 'status', 500)
+    except AirflowException as err:
+        _log.error(err)
+        response = jsonify(error="{}".format(err))
+        response.status_code = err.status_code
         return response
     else:
         return jsonify(pool.to_json())
@@ -246,10 +246,10 @@ def delete_pool(name):
     """Delete pool."""
     try:
         pool = pool_api.delete_pool(name=name)
-    except AirflowException as e:
-        _log.error(e)
-        response = jsonify(error="{}".format(e))
-        response.status_code = getattr(e, 'status', 500)
+    except AirflowException as err:
+        _log.error(err)
+        response = jsonify(error="{}".format(err))
+        response.status_code = err.status_code
         return response
     else:
         return jsonify(pool.to_json())

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5676ec79/airflow/www_rbac/api/experimental/endpoints.py
----------------------------------------------------------------------
diff --git a/airflow/www_rbac/api/experimental/endpoints.py 
b/airflow/www_rbac/api/experimental/endpoints.py
index a594b38..8663bad 100644
--- a/airflow/www_rbac/api/experimental/endpoints.py
+++ b/airflow/www_rbac/api/experimental/endpoints.py
@@ -81,7 +81,7 @@ def trigger_dag(dag_id):
     except AirflowException as err:
         _log.error(err)
         response = jsonify(error="{}".format(err))
-        response.status_code = 404
+        response.status_code = err.status_code
         return response
 
     if getattr(g, 'user', None):
@@ -128,7 +128,7 @@ def task_info(dag_id, task_id):
     except AirflowException as err:
         _log.info(err)
         response = jsonify(error="{}".format(err))
-        response.status_code = 404
+        response.status_code = err.status_code
         return response
 
     # JSONify and return.
@@ -191,7 +191,7 @@ def task_instance_info(dag_id, execution_date, task_id):
     except AirflowException as err:
         _log.info(err)
         response = jsonify(error="{}".format(err))
-        response.status_code = 404
+        response.status_code = err.status_code
         return response
 
     # JSONify and return.
@@ -232,7 +232,7 @@ def dag_run_status(dag_id, execution_date):
     except AirflowException as err:
         _log.info(err)
         response = jsonify(error="{}".format(err))
-        response.status_code = 404
+        response.status_code = err.status_code
         return response
 
     return jsonify(info)
@@ -263,10 +263,10 @@ def get_pool(name):
     """Get pool by a given name."""
     try:
         pool = pool_api.get_pool(name=name)
-    except AirflowException as e:
-        _log.error(e)
-        response = jsonify(error="{}".format(e))
-        response.status_code = getattr(e, 'status', 500)
+    except AirflowException as err:
+        _log.error(err)
+        response = jsonify(error="{}".format(err))
+        response.status_code = err.status_code
         return response
     else:
         return jsonify(pool.to_json())
@@ -278,10 +278,10 @@ def get_pools():
     """Get all pools."""
     try:
         pools = pool_api.get_pools()
-    except AirflowException as e:
-        _log.error(e)
-        response = jsonify(error="{}".format(e))
-        response.status_code = getattr(e, 'status', 500)
+    except AirflowException as err:
+        _log.error(err)
+        response = jsonify(error="{}".format(err))
+        response.status_code = err.status_code
         return response
     else:
         return jsonify([p.to_json() for p in pools])
@@ -295,10 +295,10 @@ def create_pool():
     params = request.get_json(force=True)
     try:
         pool = pool_api.create_pool(**params)
-    except AirflowException as e:
-        _log.error(e)
-        response = jsonify(error="{}".format(e))
-        response.status_code = getattr(e, 'status', 500)
+    except AirflowException as err:
+        _log.error(err)
+        response = jsonify(error="{}".format(err))
+        response.status_code = err.status_code
         return response
     else:
         return jsonify(pool.to_json())
@@ -311,10 +311,10 @@ def delete_pool(name):
     """Delete pool."""
     try:
         pool = pool_api.delete_pool(name=name)
-    except AirflowException as e:
-        _log.error(e)
-        response = jsonify(error="{}".format(e))
-        response.status_code = getattr(e, 'status', 500)
+    except AirflowException as err:
+        _log.error(err)
+        response = jsonify(error="{}".format(err))
+        response.status_code = err.status_code
         return response
     else:
         return jsonify(pool.to_json())

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5676ec79/tests/api/common/experimental/test_pool.py
----------------------------------------------------------------------
diff --git a/tests/api/common/experimental/test_pool.py 
b/tests/api/common/experimental/test_pool.py
index 08b27c6..e5efa2c 100644
--- a/tests/api/common/experimental/test_pool.py
+++ b/tests/api/common/experimental/test_pool.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,9 +19,10 @@
 
 import unittest
 
-from airflow.api.common.experimental import pool as pool_api
 from airflow import models
 from airflow import settings
+from airflow.api.common.experimental import pool as pool_api
+from airflow.exceptions import AirflowBadRequest, PoolNotFound
 
 
 class TestPool(unittest.TestCase):
@@ -52,7 +53,7 @@ class TestPool(unittest.TestCase):
         self.assertEqual(pool.pool, self.pools[0].pool)
 
     def test_get_pool_non_existing(self):
-        self.assertRaisesRegexp(pool_api.PoolNotFound,
+        self.assertRaisesRegexp(PoolNotFound,
                                 "^Pool 'test' doesn't exist$",
                                 pool_api.get_pool,
                                 name='test',
@@ -60,7 +61,7 @@ class TestPool(unittest.TestCase):
 
     def test_get_pool_bad_name(self):
         for name in ('', '    '):
-            self.assertRaisesRegexp(pool_api.PoolBadRequest,
+            self.assertRaisesRegexp(AirflowBadRequest,
                                     "^Pool name shouldn't be empty$",
                                     pool_api.get_pool,
                                     name=name,
@@ -94,7 +95,7 @@ class TestPool(unittest.TestCase):
 
     def test_create_pool_bad_name(self):
         for name in ('', '    '):
-            self.assertRaisesRegexp(pool_api.PoolBadRequest,
+            self.assertRaisesRegexp(AirflowBadRequest,
                                     "^Pool name shouldn't be empty$",
                                     pool_api.create_pool,
                                     name=name,
@@ -103,7 +104,7 @@ class TestPool(unittest.TestCase):
                                     session=self.session)
 
     def test_create_pool_bad_slots(self):
-        self.assertRaisesRegexp(pool_api.PoolBadRequest,
+        self.assertRaisesRegexp(AirflowBadRequest,
                                 "^Bad value for `slots`: foo$",
                                 pool_api.create_pool,
                                 name='foo',
@@ -126,7 +127,7 @@ class TestPool(unittest.TestCase):
 
     def test_delete_pool_bad_name(self):
         for name in ('', '    '):
-            self.assertRaisesRegexp(pool_api.PoolBadRequest,
+            self.assertRaisesRegexp(AirflowBadRequest,
                                     "^Pool name shouldn't be empty$",
                                     pool_api.delete_pool,
                                     name=name,

Reply via email to