Repository: incubator-airflow
Updated Branches:
  refs/heads/master 4ee4e474b -> c5776375f


[AIRFLOW-1315] Add Qubole File & Partition Sensors

Closes #2401 from msumit/AIRFLOW-1315


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c5776375
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c5776375
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c5776375

Branch: refs/heads/master
Commit: c5776375fdb8cc54edc0c67d623b3ea02a0f325a
Parents: 4ee4e47
Author: Sumit Maheshwari <[email protected]>
Authored: Tue Oct 31 19:32:07 2017 +0530
Committer: Sumit Maheshwari <[email protected]>
Committed: Tue Oct 31 19:32:07 2017 +0530

----------------------------------------------------------------------
 .../example_dags/example_qubole_operator.py     | 19 +++--
 .../example_dags/example_qubole_sensor.py       | 70 ++++++++++++++++
 airflow/contrib/sensors/qubole_sensor.py        | 86 ++++++++++++++++++++
 setup.py                                        |  1 +
 tests/contrib/sensors/test_qubole_sensor.py     | 85 +++++++++++++++++++
 5 files changed, 256 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c5776375/airflow/contrib/example_dags/example_qubole_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_qubole_operator.py 
b/airflow/contrib/example_dags/example_qubole_operator.py
index 03ba1dd..6d9340d 100644
--- a/airflow/contrib/example_dags/example_qubole_operator.py
+++ b/airflow/contrib/example_dags/example_qubole_operator.py
@@ -12,6 +12,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+"""
+This is only an example DAG to highlight usage of QuboleOperator in various 
scenarios,
+some of these tasks may or may not work based on your Qubole account setup.
+
+Run a shell command from Qubole Analyze against your Airflow cluster with 
following to
+trigger it manually `airflow trigger_dag example_qubole_operator`.
+
+*Note: Make sure that connection `qubole_default` is properly set before 
running this
+example. Also be aware that it might spin up clusters to run these examples.*
+"""
+
 import airflow
 from airflow import DAG
 from airflow.operators.dummy_operator import DummyOperator
@@ -20,8 +31,6 @@ from airflow.contrib.operators.qubole_operator import 
QuboleOperator
 import filecmp
 import random
 
-
-
 default_args = {
     'owner': 'airflow',
     'depends_on_past': False,
@@ -31,9 +40,9 @@ default_args = {
     'email_on_retry': False
 }
 
-# NOTE:: This is only an example DAG to highlight usage of QuboleOperator in 
various scenarios,
-# some of the tasks may or may not work based on your QDS account setup
-dag = DAG('example_qubole_operator', default_args=default_args, 
schedule_interval='@daily')
+dag = DAG('example_qubole_operator', default_args=default_args, 
schedule_interval=None)
+
+dag.doc_md = __doc__
 
 def compare_result(ds, **kwargs):
     ti = kwargs['ti']

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c5776375/airflow/contrib/example_dags/example_qubole_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_qubole_sensor.py 
b/airflow/contrib/example_dags/example_qubole_sensor.py
new file mode 100644
index 0000000..7e06fd4
--- /dev/null
+++ b/airflow/contrib/example_dags/example_qubole_sensor.py
@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This is only an example DAG to highlight usage of QuboleSensor in various 
scenarios,
+some of these tasks may or may not work based on your QDS account setup.
+
+Run a shell command from Qubole Analyze against your Airflow cluster with 
following to
+trigger it manually `airflow trigger_dag example_qubole_sensor`.
+
+*Note: Make sure that connection `qubole_default` is properly set before 
running
+this example.*
+"""
+
+from airflow import DAG
+from airflow.contrib.sensors.qubole_sensor import QuboleFileSensor, 
QubolePartitionSensor
+from airflow.utils import dates
+
+default_args = {
+    'owner': 'airflow',
+    'depends_on_past': False,
+    'start_date': dates.days_ago(2),
+    'email': ['[email protected]'],
+    'email_on_failure': False,
+    'email_on_retry': False
+}
+
+dag = DAG('example_qubole_sensor', default_args=default_args, 
schedule_interval=None)
+
+dag.doc_md = __doc__
+
+t1 = QuboleFileSensor(
+    task_id='check_s3_file',
+    qubole_conn_id='qubole_default',
+    poke_interval=60,
+    timeout=600,
+    data={"files":
+              
["s3://paid-qubole/HadoopAPIExamples/jars/hadoop-0.20.1-dev-streaming.jar",
+               "s3://paid-qubole/HadoopAPITests/data/{{ ds.split('-')[2] 
}}.tsv"
+               ] # will check for availability of all the files in array
+          },
+    dag=dag
+)
+
+t2 = QubolePartitionSensor(
+    task_id='check_hive_partition',
+    poke_interval=10,
+    timeout=60,
+    data={"schema":"default",
+          "table":"my_partitioned_table",
+          "columns":[
+              {"column" : "month", "values" : ["{{ ds.split('-')[1] }}"]},
+              {"column" : "day", "values" : ["{{ ds.split('-')[2] }}" , "{{ 
yesterday_ds.split('-')[2] }}"]}
+          ]# will check for partitions like [month=12/day=12,month=12/day=13]
+          },
+    dag=dag
+)
+
+t1.set_downstream(t2)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c5776375/airflow/contrib/sensors/qubole_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/qubole_sensor.py 
b/airflow/contrib/sensors/qubole_sensor.py
new file mode 100644
index 0000000..6915361
--- /dev/null
+++ b/airflow/contrib/sensors/qubole_sensor.py
@@ -0,0 +1,86 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import print_function
+from future import standard_library
+standard_library.install_aliases()
+
+from airflow.operators.sensors import BaseSensorOperator
+from airflow.hooks.base_hook import BaseHook
+from airflow.utils.decorators import apply_defaults
+from airflow.exceptions import AirflowException
+
+from qds_sdk.qubole import Qubole
+from qds_sdk.sensors import FileSensor, PartitionSensor
+
+
+class QuboleSensor(BaseSensorOperator):
+    """
+    Base class for all Qubole Sensors
+
+    :param qubole_conn_id: The qubole connection to run the sensor against
+    :type qubole_conn_id: string
+    :param data: a JSON object containing payload, whose presence needs to be 
checked
+    :type data: a JSON object
+
+    .. note:: Both ``data`` and ``qubole_conn_id`` fields are 
template-supported. You can
+    also use ``.txt`` files for template driven use cases.
+    """
+
+    template_fields = ('data', 'qubole_conn_id')
+
+    template_ext = ('.txt',)
+
+    @apply_defaults
+    def __init__(self, data, qubole_conn_id="qubole_default", *args, **kwargs):
+        self.data = data
+        self.qubole_conn_id = qubole_conn_id
+
+        if 'poke_interval' in kwargs and kwargs['poke_interval'] < 5:
+            raise AirflowException("Sorry, poke_interval can't be less than 5 
sec for "
+                                   "task '{0}' in dag '{1}'."
+                                   .format(kwargs['task_id'], 
kwargs['dag'].dag_id))
+
+        super(QuboleSensor, self).__init__(*args, **kwargs)
+
+    def poke(self, context):
+        conn = BaseHook.get_connection(self.qubole_conn_id)
+        Qubole.configure(api_token=conn.password, api_url=conn.host)
+
+        this.log.info('Poking: %s', self.data)
+
+        status = False
+        try:
+            status = self.sensor_class.check(self.data)
+        except Exception as e:
+            logging.exception(e)
+            status = False
+
+        this.log.info('Status of this Poke: %s', status)
+
+        return status
+
+
+class QuboleFileSensor(QuboleSensor):
+    @apply_defaults
+    def __init__(self, *args, **kwargs):
+        self.sensor_class = FileSensor
+        super(QuboleFileSensor, self).__init__(*args, **kwargs)
+
+
+class QubolePartitionSensor(QuboleSensor):
+    @apply_defaults
+    def __init__(self, *args, **kwargs):
+        self.sensor_class = PartitionSensor
+        super(QubolePartitionSensor, self).__init__(*args, **kwargs)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c5776375/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 4f1f20a..9c615ab 100644
--- a/setup.py
+++ b/setup.py
@@ -187,6 +187,7 @@ devel = [
     'nose-ignore-docstring==0.2',
     'nose-timer',
     'parameterized',
+    'qds-sdk>=1.9.6',
     'rednose',
     'paramiko',
     'requests_mock'

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c5776375/tests/contrib/sensors/test_qubole_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_qubole_sensor.py 
b/tests/contrib/sensors/test_qubole_sensor.py
new file mode 100644
index 0000000..035b231
--- /dev/null
+++ b/tests/contrib/sensors/test_qubole_sensor.py
@@ -0,0 +1,85 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+from mock import patch
+
+from datetime import datetime
+
+from airflow.models import DAG, Connection
+from airflow.utils import db
+from airflow.exceptions import AirflowException
+
+from airflow.contrib.sensors.qubole_sensor import QuboleFileSensor, 
QubolePartitionSensor
+
+DAG_ID = "qubole_test_dag"
+TASK_ID = "test_task"
+DEFAULT_CONN = "qubole_default"
+TEMPLATE_CONN = "my_conn_id"
+DEFAULT_DATE = datetime(2017, 1, 1)
+
+
+class QuboleSensorTest(unittest.TestCase):
+    def setUp(self):
+        db.merge_conn(
+            Connection(conn_id=DEFAULT_CONN, conn_type='HTTP'))
+
+    @patch('airflow.contrib.sensors.qubole_sensor.QuboleFileSensor.poke')
+    def test_file_sensore(self, patched_poke):
+        patched_poke.return_value = True
+
+        sensor = QuboleFileSensor(
+            task_id='test_qubole_file_sensor',
+            data={"files":
+                    ["s3://some_bucket/some_file"]
+            }
+        )
+
+        self.assertTrue(sensor.poke({}))
+
+    @patch('airflow.contrib.sensors.qubole_sensor.QubolePartitionSensor.poke')
+    def test_partition_sensor(self, patched_poke):
+        patched_poke.return_value = True
+
+        sensor = QubolePartitionSensor(
+            task_id='test_qubole_partition_sensor',
+            data={"schema":"default",
+                  "table":"my_partitioned_table",
+                  "columns":[
+                      {"column" : "month", "values" : ["1", "2"]},
+                  ]
+            },
+        )
+
+        self.assertTrue(sensor.poke({}))
+
+    @patch('airflow.contrib.sensors.qubole_sensor.QubolePartitionSensor.poke')
+    def test_partition_sensor_error(self, patched_poke):
+        patched_poke.return_value = True
+
+        dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
+
+        with self.assertRaises(AirflowException):
+            sensor = QubolePartitionSensor(
+                task_id='test_qubole_partition_sensor',
+                poke_interval=1,
+                data={"schema":"default",
+                      "table":"my_partitioned_table",
+                      "columns":[
+                          {"column" : "month", "values" : ["1", "2"]},
+                      ]
+                      },
+                dag=dag
+            )
\ No newline at end of file

Reply via email to