Repository: incubator-airflow Updated Branches: refs/heads/master 6e82f1d7c -> 3f1bfd38c
[AIRFLOW-2184] Add druid_checker_operator Closes #3228 from feng-tao/airflow-2184 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3f1bfd38 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3f1bfd38 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3f1bfd38 Branch: refs/heads/master Commit: 3f1bfd38cd5a1c9c58045004390a6a766bec5e8d Parents: 6e82f1d Author: Tao feng <[email protected]> Authored: Tue Apr 17 11:12:41 2018 +0200 Committer: Fokko Driesprong <[email protected]> Committed: Tue Apr 17 11:12:41 2018 +0200 ---------------------------------------------------------------------- airflow/hooks/druid_hook.py | 7 +- airflow/operators/druid_check_operator.py | 91 +++++++++++++++++++++++ docs/code.rst | 1 + tests/operators/test_druid_check_operator.py | 74 ++++++++++++++++++ 4 files changed, 170 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3f1bfd38/airflow/hooks/druid_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py index 97f8c4d..e8b61c0 100644 --- a/airflow/hooks/druid_hook.py +++ b/airflow/hooks/druid_hook.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -127,7 +127,8 @@ class DruidDbApiHook(DbApiHook): path=conn.extra_dejson.get('endpoint', '/druid/v2/sql'), scheme=conn.extra_dejson.get('schema', 'http') ) - self.log('Get the connection to druid broker on {host}'.format(host=conn.host)) + self.log.info('Get the connection to druid ' + 'broker on {host}'.format(host=conn.host)) return druid_broker_conn def get_uri(self): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3f1bfd38/airflow/operators/druid_check_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/druid_check_operator.py b/airflow/operators/druid_check_operator.py new file mode 100644 index 0000000..73f7915 --- /dev/null +++ b/airflow/operators/druid_check_operator.py @@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.exceptions import AirflowException +from airflow.hooks.druid_hook import DruidDbApiHook +from airflow.operators.check_operator import CheckOperator +from airflow.utils.decorators import apply_defaults + + +class DruidCheckOperator(CheckOperator): + """ + Performs checks against Druid. The ``DruidCheckOperator`` expects + a sql query that will return a single row. Each value on that + first row is evaluated using python ``bool`` casting. If any of the + values return ``False`` the check is failed and errors out. + + Note that Python bool casting evals the following as ``False``: + + * ``False`` + * ``0`` + * Empty string (``""``) + * Empty list (``[]``) + * Empty dictionary or set (``{}``) + + Given a query like ``SELECT COUNT(*) FROM foo``, it will fail only if + the count ``== 0``. You can craft much more complex query that could, + for instance, check that the table has the same number of rows as + the source table upstream, or that the count of today's partition is + greater than yesterday's partition, or that a set of metrics are less + than 3 standard deviation for the 7 day average. + This operator can be used as a data quality check in your pipeline, and + depending on where you put it in your DAG, you have the choice to + stop the critical path, preventing from + publishing dubious data, or on the side and receive email alterts + without stopping the progress of the DAG. + + :param sql: the sql to be executed + :type sql: string + :param druid_broker_conn_id: reference to the druid broker + :type druid_broker_conn_id: string + """ + + @apply_defaults + def __init__( + self, sql, + druid_broker_conn_id='druid_broker_default', + *args, **kwargs): + super(DruidCheckOperator, self).__init__(sql=sql, *args, **kwargs) + self.druid_broker_conn_id = druid_broker_conn_id + self.sql = sql + + def get_db_hook(self): + """ + Return the druid db api hook. + """ + return DruidDbApiHook(druid_broker_conn_id=self.druid_broker_conn_id) + + def get_first(self, sql): + """ + Executes the druid sql to druid broker and returns the first resulting row. + + :param sql: the sql statement to be executed (str) + :type sql: str + """ + with self.get_db_hook().get_conn() as cur: + cur.execute(sql) + return cur.fetchone() + + def execute(self, context=None): + self.log.info('Executing SQL check: {}'.format(self.sql)) + record = self.get_first(self.sql) + self.log.info("Record: {}".format(str(record))) + if not record: + raise AirflowException("The query returned None") + self.log.info("Success.") http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3f1bfd38/docs/code.rst ---------------------------------------------------------------------- diff --git a/docs/code.rst b/docs/code.rst index d4810f0..577b9d2 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -51,6 +51,7 @@ Operators .. autoclass:: airflow.operators.check_operator.CheckOperator .. autoclass:: airflow.operators.docker_operator.DockerOperator .. autoclass:: airflow.operators.dummy_operator.DummyOperator +.. autoclass:: airflow.operators.druid_check_operator.DruidCheckOperator .. autoclass:: airflow.operators.email_operator.EmailOperator .. autoclass:: airflow.operators.generic_transfer.GenericTransfer .. autoclass:: airflow.operators.hive_to_druid.HiveToDruidTransfer http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3f1bfd38/tests/operators/test_druid_check_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/test_druid_check_operator.py b/tests/operators/test_druid_check_operator.py new file mode 100644 index 0000000..f64ad7e --- /dev/null +++ b/tests/operators/test_druid_check_operator.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from datetime import datetime +import unittest + +from airflow.models import DAG +from airflow.exceptions import AirflowException +from airflow.operators.druid_check_operator import DruidCheckOperator + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + + +class DruidCheckOperatorTest(unittest.TestCase): + + def setUp(self): + self.task_id = 'test_task' + self.druid_broker_conn_id = 'default_conn' + + def __construct_operator(self, sql): + + dag = DAG('test_dag', start_date=datetime(2017, 1, 1)) + + return DruidCheckOperator( + dag=dag, + task_id=self.task_id, + druid_broker_conn_id=self.druid_broker_conn_id, + sql=sql) + + @mock.patch.object(DruidCheckOperator, 'get_first') + def test_execute_pass(self, mock_get_first): + mock_get_first.return_value = [10] + + sql = 'select count(*) from tab1 limit 1;' + + operator = self.__construct_operator(sql) + + try: + operator.execute(None) + except AirflowException: + self.fail('Exception should not thrown!') + + @mock.patch.object(DruidCheckOperator, 'get_first') + def test_execute_fail(self, mock_get_first): + mock_get_first.return_value = [] + sql = 'select count(*) from tab1 limit 1;' + + operator = self.__construct_operator(sql) + + with self.assertRaises(AirflowException): + operator.execute()
