Repository: incubator-airflow Updated Branches: refs/heads/master 6b890d157 -> 6be02475f
[AIRFLOW-1192] Some enhancements to qubole_operator 1. Upgrade qds_sdk version to latest 2. Add support to run Zeppelin Notebooks 3. Move out initialization of QuboleHook from init() Closes #2322 from msumit/AIRFLOW-1192 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6be02475 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6be02475 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6be02475 Branch: refs/heads/master Commit: 6be02475f80c2f493e640272ab5344ca686204a0 Parents: 6b890d1 Author: Sumit Maheshwari <[email protected]> Authored: Wed Jun 7 09:09:50 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Wed Jun 7 09:09:50 2017 +0200 ---------------------------------------------------------------------- airflow/contrib/hooks/qubole_hook.py | 4 +- airflow/contrib/operators/qubole_operator.py | 30 ++++--- scripts/ci/requirements.txt | 1 + setup.py | 2 +- tests/contrib/operators/test_qubole_operator.py | 94 ++++++++++++++++++++ 5 files changed, 115 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6be02475/airflow/contrib/hooks/qubole_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py index 93db72a..c51a757 100755 --- a/airflow/contrib/hooks/qubole_hook.py +++ b/airflow/contrib/hooks/qubole_hook.py @@ -42,7 +42,7 @@ COMMAND_CLASSES = { "dbimportcmd": DbImportCommand } -HYPHEN_ARGS = ['cluster_label', 'app_id'] +HYPHEN_ARGS = ['cluster_label', 'app_id', 'note_id'] POSITIONAL_ARGS = ['sub_command', 'parameters'] @@ -57,7 +57,7 @@ COMMAND_ARGS = { 'name'], 'dbtapquerycmd': ['db_tap_id', 'query', 'macros', 'tags', 'name'], 'sparkcmd': ['program', 'cmdline', 'sql', 'script_location', 'macros', 'tags', - 'cluster_label', 'language', 'app_id', 'name', 'arguments', + 'cluster_label', 'language', 'app_id', 'name', 'arguments', 'note_id', 'user_program_arguments'], 'dbexportcmd': ['mode', 'hive_table', 'partition_spec', 'dbtap_id', 'db_table', 'db_update_mode', 'db_update_keys', 'export_dir', http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6be02475/airflow/contrib/operators/qubole_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/qubole_operator.py b/airflow/contrib/operators/qubole_operator.py index 623899d..a5e9f5e 100755 --- a/airflow/contrib/operators/qubole_operator.py +++ b/airflow/contrib/operators/qubole_operator.py @@ -102,8 +102,9 @@ class QuboleOperator(BaseOperator): ``sub_command``, ``script``, ``files``, ``archives``, ``program``, ``cmdline``, ``sql``, ``where_clause``, ``extract_query``, ``boundary_query``, ``macros``, ``tags``, ``name``, ``parameters``, ``dbtap_id``, ``hive_table``, ``db_table``, - ``split_column``, ``db_update_keys``, ``export_dir``, ``partition_spec``. You - can also use ``.txt`` files for template driven use cases. + ``split_column``, ``note_id``, ``db_update_keys``, ``export_dir``, + ``partition_spec``, ``qubole_conn_id``, ``arguments``, ``user_program_arguments``. + You can also use ``.txt`` files for template driven use cases. .. note:: In QuboleOperator there is a default handler for task failures and retries, which generally kills the command running at QDS for the corresponding task @@ -114,8 +115,10 @@ class QuboleOperator(BaseOperator): template_fields = ('query', 'script_location', 'sub_command', 'script', 'files', 'archives', 'program', 'cmdline', 'sql', 'where_clause', 'tags', 'extract_query', 'boundary_query', 'macros', 'name', 'parameters', - 'dbtap_id', 'hive_table', 'db_table', 'split_column', - 'db_update_keys', 'export_dir', 'partition_spec') + 'dbtap_id', 'hive_table', 'db_table', 'split_column', 'note_id', + 'db_update_keys', 'export_dir', 'partition_spec', 'qubole_conn_id', + 'arguments', 'user_program_arguments') + template_ext = ('.txt',) ui_color = '#3064A1' ui_fgcolor = '#fff' @@ -125,7 +128,6 @@ class QuboleOperator(BaseOperator): self.args = args self.kwargs = kwargs self.kwargs['qubole_conn_id'] = qubole_conn_id - self.hook = QuboleHook(*self.args, **self.kwargs) super(QuboleOperator, self).__init__(*args, **kwargs) if self.on_failure_callback is None: @@ -135,21 +137,23 @@ class QuboleOperator(BaseOperator): self.on_retry_callback = QuboleHook.handle_failure_retry def execute(self, context): - # Reinitiating the hook, as some template fields might have changed - self.hook = QuboleHook(*self.args, **self.kwargs) - return self.hook.execute(context) + return self.get_hook().execute(context) - def on_kill(self, ti): - self.hook.kill(ti) + def on_kill(self, ti=None): + self.get_hook().kill(ti) def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True): - return self.hook.get_results(ti, fp, inline, delim, fetch) + return self.get_hook().get_results(ti, fp, inline, delim, fetch) def get_log(self, ti): - return self.hook.get_log(ti) + return self.get_hook().get_log(ti) def get_jobs_id(self, ti): - return self.hook.get_jobs_id(ti) + return self.get_hook().get_jobs_id(ti) + + def get_hook(self): + # Reinitiating the hook, as some template fields might have changed + return QuboleHook(*self.args, **self.kwargs) def __getattribute__(self, name): if name in QuboleOperator.template_fields: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6be02475/scripts/ci/requirements.txt ---------------------------------------------------------------------- diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt index bf2d386..5b0612e 100644 --- a/scripts/ci/requirements.txt +++ b/scripts/ci/requirements.txt @@ -73,6 +73,7 @@ PyOpenSSL PySmbClient python-daemon python-dateutil +qds-sdk>=1.9.6 redis rednose requests http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6be02475/setup.py ---------------------------------------------------------------------- diff --git a/setup.py b/setup.py index 71c3f49..527ee5d 100644 --- a/setup.py +++ b/setup.py @@ -179,7 +179,7 @@ password = [ 'flask-bcrypt>=0.7.1', ] github_enterprise = ['Flask-OAuthlib>=0.9.1'] -qds = ['qds-sdk>=1.9.0'] +qds = ['qds-sdk>=1.9.6'] cloudant = ['cloudant>=0.5.9,<2.0'] # major update coming soon, clamp to 0.x redis = ['redis>=2.10.5'] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6be02475/tests/contrib/operators/test_qubole_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_qubole_operator.py b/tests/contrib/operators/test_qubole_operator.py new file mode 100644 index 0000000..0e6e13d --- /dev/null +++ b/tests/contrib/operators/test_qubole_operator.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest +from datetime import datetime + +from airflow.models import DAG, Connection +from airflow.utils import db + +from airflow.contrib.hooks.qubole_hook import QuboleHook +from airflow.contrib.operators.qubole_operator import QuboleOperator + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + +DAG_ID="qubole_test_dag" +TASK_ID="test_task" +DEFAULT_CONN="qubole_default" +TEMPLATE_CONN = "my_conn_id" +DEFAULT_DATE = datetime(2017, 1, 1) + + +class QuboleOperatorTest(unittest.TestCase): + def setUp(self): + db.merge_conn( + Connection(conn_id=DEFAULT_CONN, conn_type='HTTP')) + + def test_init_with_default_connection(self): + op = QuboleOperator(task_id=TASK_ID) + self.assertEqual(op.task_id, TASK_ID) + self.assertEqual(op.qubole_conn_id, DEFAULT_CONN) + + def test_init_with_template_connection(self): + dag = DAG(DAG_ID, start_date=DEFAULT_DATE) + + with dag: + task = QuboleOperator(task_id=TASK_ID, dag=dag, + qubole_conn_id="{{ dag_run.conf['qubole_conn_id'] }}") + + result = task.render_template('qubole_conn_id', "{{ qubole_conn_id }}", + {'qubole_conn_id' : TEMPLATE_CONN}) + self.assertEqual(task.task_id, TASK_ID) + self.assertEqual(result, TEMPLATE_CONN) + + def test_get_hook(self): + dag = DAG(DAG_ID, start_date=DEFAULT_DATE) + + with dag: + task = QuboleOperator(task_id=TASK_ID, command_type='hivecmd', dag=dag) + + hook = task.get_hook() + self.assertEqual(hook.__class__, QuboleHook) + + def test_hyphen_args_note_id(self): + dag = DAG(DAG_ID, start_date=DEFAULT_DATE) + + with dag: + task = QuboleOperator(task_id=TASK_ID, command_type='sparkcmd', + note_id="123", dag=dag) + self.assertEqual(task.get_hook().create_cmd_args({'run_id':'dummy'})[0], + "--note-id=123") + + def test_position_args_parameters(self): + dag = DAG(DAG_ID, start_date=DEFAULT_DATE) + + with dag: + task = QuboleOperator(task_id=TASK_ID, command_type='pigcmd', + parameters="key1=value1 key2=value2", dag=dag) + + self.assertEqual(task.get_hook().create_cmd_args({'run_id':'dummy'})[1], + "key1=value1") + self.assertEqual(task.get_hook().create_cmd_args({'run_id':'dummy'})[2], + "key2=value2") + + + +
