Repository: incubator-airflow
Updated Branches:
  refs/heads/master 6e82f1d7c -> 3f1bfd38c


[AIRFLOW-2184] Add druid_checker_operator

Closes #3228 from feng-tao/airflow-2184


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3f1bfd38
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3f1bfd38
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3f1bfd38

Branch: refs/heads/master
Commit: 3f1bfd38cd5a1c9c58045004390a6a766bec5e8d
Parents: 6e82f1d
Author: Tao feng <[email protected]>
Authored: Tue Apr 17 11:12:41 2018 +0200
Committer: Fokko Driesprong <[email protected]>
Committed: Tue Apr 17 11:12:41 2018 +0200

----------------------------------------------------------------------
 airflow/hooks/druid_hook.py                  |  7 +-
 airflow/operators/druid_check_operator.py    | 91 +++++++++++++++++++++++
 docs/code.rst                                |  1 +
 tests/operators/test_druid_check_operator.py | 74 ++++++++++++++++++
 4 files changed, 170 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3f1bfd38/airflow/hooks/druid_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py
index 97f8c4d..e8b61c0 100644
--- a/airflow/hooks/druid_hook.py
+++ b/airflow/hooks/druid_hook.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -127,7 +127,8 @@ class DruidDbApiHook(DbApiHook):
             path=conn.extra_dejson.get('endpoint', '/druid/v2/sql'),
             scheme=conn.extra_dejson.get('schema', 'http')
         )
-        self.log('Get the connection to druid broker on 
{host}'.format(host=conn.host))
+        self.log.info('Get the connection to druid '
+                      'broker on {host}'.format(host=conn.host))
         return druid_broker_conn
 
     def get_uri(self):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3f1bfd38/airflow/operators/druid_check_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/druid_check_operator.py 
b/airflow/operators/druid_check_operator.py
new file mode 100644
index 0000000..73f7915
--- /dev/null
+++ b/airflow/operators/druid_check_operator.py
@@ -0,0 +1,91 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.druid_hook import DruidDbApiHook
+from airflow.operators.check_operator import CheckOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DruidCheckOperator(CheckOperator):
+    """
+    Performs checks against Druid. The ``DruidCheckOperator`` expects
+    a sql query that will return a single row. Each value on that
+    first row is evaluated using python ``bool`` casting. If any of the
+    values return ``False`` the check is failed and errors out.
+
+    Note that Python bool casting evals the following as ``False``:
+
+    * ``False``
+    * ``0``
+    * Empty string (``""``)
+    * Empty list (``[]``)
+    * Empty dictionary or set (``{}``)
+
+    Given a query like ``SELECT COUNT(*) FROM foo``, it will fail only if
+    the count ``== 0``. You can craft much more complex query that could,
+    for instance, check that the table has the same number of rows as
+    the source table upstream, or that the count of today's partition is
+    greater than yesterday's partition, or that a set of metrics are less
+    than 3 standard deviation for the 7 day average.
+    This operator can be used as a data quality check in your pipeline, and
+    depending on where you put it in your DAG, you have the choice to
+    stop the critical path, preventing from
+    publishing dubious data, or on the side and receive email alterts
+    without stopping the progress of the DAG.
+
+    :param sql: the sql to be executed
+    :type sql: string
+    :param druid_broker_conn_id: reference to the druid broker
+    :type druid_broker_conn_id: string
+    """
+
+    @apply_defaults
+    def __init__(
+            self, sql,
+            druid_broker_conn_id='druid_broker_default',
+            *args, **kwargs):
+        super(DruidCheckOperator, self).__init__(sql=sql, *args, **kwargs)
+        self.druid_broker_conn_id = druid_broker_conn_id
+        self.sql = sql
+
+    def get_db_hook(self):
+        """
+        Return the druid db api hook.
+        """
+        return DruidDbApiHook(druid_broker_conn_id=self.druid_broker_conn_id)
+
+    def get_first(self, sql):
+        """
+        Executes the druid sql to druid broker and returns the first resulting 
row.
+
+        :param sql: the sql statement to be executed (str)
+        :type sql: str
+        """
+        with self.get_db_hook().get_conn() as cur:
+            cur.execute(sql)
+            return cur.fetchone()
+
+    def execute(self, context=None):
+        self.log.info('Executing SQL check: {}'.format(self.sql))
+        record = self.get_first(self.sql)
+        self.log.info("Record: {}".format(str(record)))
+        if not record:
+            raise AirflowException("The query returned None")
+        self.log.info("Success.")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3f1bfd38/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index d4810f0..577b9d2 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -51,6 +51,7 @@ Operators
 .. autoclass:: airflow.operators.check_operator.CheckOperator
 .. autoclass:: airflow.operators.docker_operator.DockerOperator
 .. autoclass:: airflow.operators.dummy_operator.DummyOperator
+.. autoclass:: airflow.operators.druid_check_operator.DruidCheckOperator
 .. autoclass:: airflow.operators.email_operator.EmailOperator
 .. autoclass:: airflow.operators.generic_transfer.GenericTransfer
 .. autoclass:: airflow.operators.hive_to_druid.HiveToDruidTransfer

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3f1bfd38/tests/operators/test_druid_check_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/test_druid_check_operator.py 
b/tests/operators/test_druid_check_operator.py
new file mode 100644
index 0000000..f64ad7e
--- /dev/null
+++ b/tests/operators/test_druid_check_operator.py
@@ -0,0 +1,74 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from datetime import datetime
+import unittest
+
+from airflow.models import DAG
+from airflow.exceptions import AirflowException
+from airflow.operators.druid_check_operator import DruidCheckOperator
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+class DruidCheckOperatorTest(unittest.TestCase):
+
+    def setUp(self):
+        self.task_id = 'test_task'
+        self.druid_broker_conn_id = 'default_conn'
+
+    def __construct_operator(self, sql):
+
+        dag = DAG('test_dag', start_date=datetime(2017, 1, 1))
+
+        return DruidCheckOperator(
+            dag=dag,
+            task_id=self.task_id,
+            druid_broker_conn_id=self.druid_broker_conn_id,
+            sql=sql)
+
+    @mock.patch.object(DruidCheckOperator, 'get_first')
+    def test_execute_pass(self, mock_get_first):
+        mock_get_first.return_value = [10]
+
+        sql = 'select count(*) from tab1 limit 1;'
+
+        operator = self.__construct_operator(sql)
+
+        try:
+            operator.execute(None)
+        except AirflowException:
+            self.fail('Exception should not thrown!')
+
+    @mock.patch.object(DruidCheckOperator, 'get_first')
+    def test_execute_fail(self, mock_get_first):
+        mock_get_first.return_value = []
+        sql = 'select count(*) from tab1 limit 1;'
+
+        operator = self.__construct_operator(sql)
+
+        with self.assertRaises(AirflowException):
+            operator.execute()

Reply via email to