Repository: incubator-airflow Updated Branches: refs/heads/master 4ee4e474b -> c5776375f
[AIRFLOW-1315] Add Qubole File & Partition Sensors Closes #2401 from msumit/AIRFLOW-1315 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c5776375 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c5776375 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c5776375 Branch: refs/heads/master Commit: c5776375fdb8cc54edc0c67d623b3ea02a0f325a Parents: 4ee4e47 Author: Sumit Maheshwari <[email protected]> Authored: Tue Oct 31 19:32:07 2017 +0530 Committer: Sumit Maheshwari <[email protected]> Committed: Tue Oct 31 19:32:07 2017 +0530 ---------------------------------------------------------------------- .../example_dags/example_qubole_operator.py | 19 +++-- .../example_dags/example_qubole_sensor.py | 70 ++++++++++++++++ airflow/contrib/sensors/qubole_sensor.py | 86 ++++++++++++++++++++ setup.py | 1 + tests/contrib/sensors/test_qubole_sensor.py | 85 +++++++++++++++++++ 5 files changed, 256 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c5776375/airflow/contrib/example_dags/example_qubole_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/example_dags/example_qubole_operator.py b/airflow/contrib/example_dags/example_qubole_operator.py index 03ba1dd..6d9340d 100644 --- a/airflow/contrib/example_dags/example_qubole_operator.py +++ b/airflow/contrib/example_dags/example_qubole_operator.py @@ -12,6 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +This is only an example DAG to highlight usage of QuboleOperator in various scenarios, +some of these tasks may or may not work based on your Qubole account setup. + +Run a shell command from Qubole Analyze against your Airflow cluster with following to +trigger it manually `airflow trigger_dag example_qubole_operator`. + +*Note: Make sure that connection `qubole_default` is properly set before running this +example. Also be aware that it might spin up clusters to run these examples.* +""" + import airflow from airflow import DAG from airflow.operators.dummy_operator import DummyOperator @@ -20,8 +31,6 @@ from airflow.contrib.operators.qubole_operator import QuboleOperator import filecmp import random - - default_args = { 'owner': 'airflow', 'depends_on_past': False, @@ -31,9 +40,9 @@ default_args = { 'email_on_retry': False } -# NOTE:: This is only an example DAG to highlight usage of QuboleOperator in various scenarios, -# some of the tasks may or may not work based on your QDS account setup -dag = DAG('example_qubole_operator', default_args=default_args, schedule_interval='@daily') +dag = DAG('example_qubole_operator', default_args=default_args, schedule_interval=None) + +dag.doc_md = __doc__ def compare_result(ds, **kwargs): ti = kwargs['ti'] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c5776375/airflow/contrib/example_dags/example_qubole_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/example_dags/example_qubole_sensor.py b/airflow/contrib/example_dags/example_qubole_sensor.py new file mode 100644 index 0000000..7e06fd4 --- /dev/null +++ b/airflow/contrib/example_dags/example_qubole_sensor.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This is only an example DAG to highlight usage of QuboleSensor in various scenarios, +some of these tasks may or may not work based on your QDS account setup. + +Run a shell command from Qubole Analyze against your Airflow cluster with following to +trigger it manually `airflow trigger_dag example_qubole_sensor`. + +*Note: Make sure that connection `qubole_default` is properly set before running +this example.* +""" + +from airflow import DAG +from airflow.contrib.sensors.qubole_sensor import QuboleFileSensor, QubolePartitionSensor +from airflow.utils import dates + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': dates.days_ago(2), + 'email': ['[email protected]'], + 'email_on_failure': False, + 'email_on_retry': False +} + +dag = DAG('example_qubole_sensor', default_args=default_args, schedule_interval=None) + +dag.doc_md = __doc__ + +t1 = QuboleFileSensor( + task_id='check_s3_file', + qubole_conn_id='qubole_default', + poke_interval=60, + timeout=600, + data={"files": + ["s3://paid-qubole/HadoopAPIExamples/jars/hadoop-0.20.1-dev-streaming.jar", + "s3://paid-qubole/HadoopAPITests/data/{{ ds.split('-')[2] }}.tsv" + ] # will check for availability of all the files in array + }, + dag=dag +) + +t2 = QubolePartitionSensor( + task_id='check_hive_partition', + poke_interval=10, + timeout=60, + data={"schema":"default", + "table":"my_partitioned_table", + "columns":[ + {"column" : "month", "values" : ["{{ ds.split('-')[1] }}"]}, + {"column" : "day", "values" : ["{{ ds.split('-')[2] }}" , "{{ yesterday_ds.split('-')[2] }}"]} + ]# will check for partitions like [month=12/day=12,month=12/day=13] + }, + dag=dag +) + +t1.set_downstream(t2) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c5776375/airflow/contrib/sensors/qubole_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/qubole_sensor.py b/airflow/contrib/sensors/qubole_sensor.py new file mode 100644 index 0000000..6915361 --- /dev/null +++ b/airflow/contrib/sensors/qubole_sensor.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +from future import standard_library +standard_library.install_aliases() + +from airflow.operators.sensors import BaseSensorOperator +from airflow.hooks.base_hook import BaseHook +from airflow.utils.decorators import apply_defaults +from airflow.exceptions import AirflowException + +from qds_sdk.qubole import Qubole +from qds_sdk.sensors import FileSensor, PartitionSensor + + +class QuboleSensor(BaseSensorOperator): + """ + Base class for all Qubole Sensors + + :param qubole_conn_id: The qubole connection to run the sensor against + :type qubole_conn_id: string + :param data: a JSON object containing payload, whose presence needs to be checked + :type data: a JSON object + + .. note:: Both ``data`` and ``qubole_conn_id`` fields are template-supported. You can + also use ``.txt`` files for template driven use cases. + """ + + template_fields = ('data', 'qubole_conn_id') + + template_ext = ('.txt',) + + @apply_defaults + def __init__(self, data, qubole_conn_id="qubole_default", *args, **kwargs): + self.data = data + self.qubole_conn_id = qubole_conn_id + + if 'poke_interval' in kwargs and kwargs['poke_interval'] < 5: + raise AirflowException("Sorry, poke_interval can't be less than 5 sec for " + "task '{0}' in dag '{1}'." + .format(kwargs['task_id'], kwargs['dag'].dag_id)) + + super(QuboleSensor, self).__init__(*args, **kwargs) + + def poke(self, context): + conn = BaseHook.get_connection(self.qubole_conn_id) + Qubole.configure(api_token=conn.password, api_url=conn.host) + + this.log.info('Poking: %s', self.data) + + status = False + try: + status = self.sensor_class.check(self.data) + except Exception as e: + logging.exception(e) + status = False + + this.log.info('Status of this Poke: %s', status) + + return status + + +class QuboleFileSensor(QuboleSensor): + @apply_defaults + def __init__(self, *args, **kwargs): + self.sensor_class = FileSensor + super(QuboleFileSensor, self).__init__(*args, **kwargs) + + +class QubolePartitionSensor(QuboleSensor): + @apply_defaults + def __init__(self, *args, **kwargs): + self.sensor_class = PartitionSensor + super(QubolePartitionSensor, self).__init__(*args, **kwargs) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c5776375/setup.py ---------------------------------------------------------------------- diff --git a/setup.py b/setup.py index 4f1f20a..9c615ab 100644 --- a/setup.py +++ b/setup.py @@ -187,6 +187,7 @@ devel = [ 'nose-ignore-docstring==0.2', 'nose-timer', 'parameterized', + 'qds-sdk>=1.9.6', 'rednose', 'paramiko', 'requests_mock' http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c5776375/tests/contrib/sensors/test_qubole_sensor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/test_qubole_sensor.py b/tests/contrib/sensors/test_qubole_sensor.py new file mode 100644 index 0000000..035b231 --- /dev/null +++ b/tests/contrib/sensors/test_qubole_sensor.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest +from mock import patch + +from datetime import datetime + +from airflow.models import DAG, Connection +from airflow.utils import db +from airflow.exceptions import AirflowException + +from airflow.contrib.sensors.qubole_sensor import QuboleFileSensor, QubolePartitionSensor + +DAG_ID = "qubole_test_dag" +TASK_ID = "test_task" +DEFAULT_CONN = "qubole_default" +TEMPLATE_CONN = "my_conn_id" +DEFAULT_DATE = datetime(2017, 1, 1) + + +class QuboleSensorTest(unittest.TestCase): + def setUp(self): + db.merge_conn( + Connection(conn_id=DEFAULT_CONN, conn_type='HTTP')) + + @patch('airflow.contrib.sensors.qubole_sensor.QuboleFileSensor.poke') + def test_file_sensore(self, patched_poke): + patched_poke.return_value = True + + sensor = QuboleFileSensor( + task_id='test_qubole_file_sensor', + data={"files": + ["s3://some_bucket/some_file"] + } + ) + + self.assertTrue(sensor.poke({})) + + @patch('airflow.contrib.sensors.qubole_sensor.QubolePartitionSensor.poke') + def test_partition_sensor(self, patched_poke): + patched_poke.return_value = True + + sensor = QubolePartitionSensor( + task_id='test_qubole_partition_sensor', + data={"schema":"default", + "table":"my_partitioned_table", + "columns":[ + {"column" : "month", "values" : ["1", "2"]}, + ] + }, + ) + + self.assertTrue(sensor.poke({})) + + @patch('airflow.contrib.sensors.qubole_sensor.QubolePartitionSensor.poke') + def test_partition_sensor_error(self, patched_poke): + patched_poke.return_value = True + + dag = DAG(DAG_ID, start_date=DEFAULT_DATE) + + with self.assertRaises(AirflowException): + sensor = QubolePartitionSensor( + task_id='test_qubole_partition_sensor', + poke_interval=1, + data={"schema":"default", + "table":"my_partitioned_table", + "columns":[ + {"column" : "month", "values" : ["1", "2"]}, + ] + }, + dag=dag + ) \ No newline at end of file
