Repository: incubator-airflow Updated Branches: refs/heads/master c208a41fc -> eb994d683
[AIRFLOW-1770] Allow HiveOperator to take in a file Clarify and upgrade HiveOperator. Include description of hql parameter being able to take in a relative path from the dag file of a hive script, templated or not. Add ability to template hiveconf variables. Add default value to the map reduce job name as well as add updated hiveconf var for queue. Closes #2752 from wolfier/AIRFLOW-1770 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eb994d68 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eb994d68 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eb994d68 Branch: refs/heads/master Commit: eb994d683f244f63dd191a6640baaee66ffc8e29 Parents: c208a41 Author: Alan Ma <[email protected]> Authored: Fri Jan 12 11:01:11 2018 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Fri Jan 12 11:01:11 2018 +0100 ---------------------------------------------------------------------- airflow/hooks/hive_hooks.py | 9 ++++++++- airflow/operators/hive_operator.py | 23 +++++++++++++++++------ tests/operators/hive_operator.py | 11 +++++++++++ 3 files changed, 36 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb994d68/airflow/hooks/hive_hooks.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py index 3d986fd..47aebc8 100644 --- a/airflow/hooks/hive_hooks.py +++ b/airflow/hooks/hive_hooks.py @@ -183,7 +183,14 @@ class HiveCliHook(BaseHook): hive_conf_params.extend( ['-hiveconf', 'mapreduce.job.queuename={}' - .format(self.mapred_queue)]) + .format(self.mapred_queue), + '-hiveconf', + 'mapred.job.queue.name={}' + .format(self.mapred_queue), + '-hiveconf', + 'tez.job.queue.name={}' + .format(self.mapred_queue) + ]) if self.mapred_queue_priority: hive_conf_params.extend( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb994d68/airflow/operators/hive_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/hive_operator.py b/airflow/operators/hive_operator.py index ce98544..ffb98ac 100644 --- a/airflow/operators/hive_operator.py +++ b/airflow/operators/hive_operator.py @@ -21,15 +21,17 @@ from airflow.utils.operator_helpers import context_to_airflow_vars class HiveOperator(BaseOperator): """ - Executes hql code in a specific Hive database. + Executes hql code or hive script in a specific Hive database. - :param hql: the hql to be executed + :param hql: the hql to be executed. Note that you may also use + a relative path from the dag file of a (template) hive script. :type hql: string :param hive_cli_conn_id: reference to the Hive database :type hive_cli_conn_id: string :param hiveconf_jinja_translate: when True, hiveconf-type templating - ${var} gets translated into jinja-type templating {{ var }}. Note that - you may want to use this along with the + ${var} gets translated into jinja-type templating {{ var }} and + ${hiveconf:var} gets translated into jinja-type templating {{ var }}. + Note that you may want to use this along with the ``DAG(user_defined_macros=myargs)`` parameter. View the DAG object documentation for more details. :type hiveconf_jinja_translate: boolean @@ -46,7 +48,8 @@ class HiveOperator(BaseOperator): :type mapred_job_name: string """ - template_fields = ('hql', 'schema') + template_fields = ('hql', 'schema', 'hive_cli_conn_id', 'mapred_queue', + 'mapred_job_name', 'mapred_queue_priority') template_ext = ('.hql', '.sql',) ui_color = '#f0e4ec' @@ -94,13 +97,21 @@ class HiveOperator(BaseOperator): def prepare_template(self): if self.hiveconf_jinja_translate: self.hql = re.sub( - "(\$\{([ a-zA-Z0-9_]*)\})", "{{ \g<2> }}", self.hql) + "(\$\{(hiveconf:)?([ a-zA-Z0-9_]*)\})", "{{ \g<3> }}", self.hql) if self.script_begin_tag and self.script_begin_tag in self.hql: self.hql = "\n".join(self.hql.split(self.script_begin_tag)[1:]) def execute(self, context): self.log.info('Executing: %s', self.hql) self.hook = self.get_hook() + + # set the mapred_job_name if it's not set with dag, task, execution time info + if not self.mapred_job_name: + ti = context['ti'] + self.hook.mapred_job_name = 'Airflow HiveOperator task for {}.{}.{}.{}'\ + .format(ti.hostname.split('.')[0], ti.dag_id, ti.task_id, + ti.execution_date.isoformat()) + self.hook.run_cli(hql=self.hql, schema=self.schema, hive_conf=context_to_airflow_vars(context)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb994d68/tests/operators/hive_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py index 279c7ba..e547d4a 100644 --- a/tests/operators/hive_operator.py +++ b/tests/operators/hive_operator.py @@ -81,6 +81,17 @@ class HiveOperatorConfigTest(HiveEnvironmentTest): self.assertEqual(t.get_hook().mapred_queue, specific_mapred_queue) +class HiveOperatorTest(HiveEnvironmentTest): + + def test_hiveconf_jinja_translate(self): + hql = "SELECT ${num_col} FROM ${hiveconf:table};" + t = operators.hive_operator.HiveOperator( + hiveconf_jinja_translate=True, + task_id='dry_run_basic_hql', hql=hql, dag=self.dag) + t.prepare_template() + self.assertEqual(t.hql, "SELECT {{ num_col }} FROM {{ table }};") + + if 'AIRFLOW_RUNALL_TESTS' in os.environ: import airflow.hooks.hive_hooks
