Repository: incubator-airflow Updated Branches: refs/heads/master e30a1f451 -> 09bbe2477
[AIRFLOW-1153] Allow HiveOperators to take hiveconfs HiveOperator can only replace variables via jinja and the replacements are global to the dag through the context and user_defined_macros. It would be much more flexible to open up hive_conf to the HiveOperator level so hive scripts can be recycled at the task level, leveraging HiveHook already existing hive_conf param and _prepare_hiveconf function. Closes #3136 from wolfier/AIRFLOW-1153 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/09bbe247 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/09bbe247 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/09bbe247 Branch: refs/heads/master Commit: 09bbe247728993867c716635951219cc49f65dd1 Parents: e30a1f4 Author: Alan Ma <[email protected]> Authored: Mon Apr 23 18:56:29 2018 +0200 Committer: Fokko Driesprong <[email protected]> Committed: Mon Apr 23 18:56:29 2018 +0200 ---------------------------------------------------------------------- airflow/operators/hive_operator.py | 21 +++++++++++++++------ tests/operators/hive_operator.py | 10 ++++++++++ 2 files changed, 25 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/09bbe247/airflow/operators/hive_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/hive_operator.py b/airflow/operators/hive_operator.py index 64ea61d..b62744b 100644 --- a/airflow/operators/hive_operator.py +++ b/airflow/operators/hive_operator.py @@ -35,6 +35,9 @@ class HiveOperator(BaseOperator): :type hql: string :param hive_cli_conn_id: reference to the Hive database :type hive_cli_conn_id: string + :param hiveconfs: if defined, these key value pairs will be passed + to hive as ``-hiveconf "key"="value"`` + :type hiveconfs: dict :param hiveconf_jinja_translate: when True, hiveconf-type templating ${var} gets translated into jinja-type templating {{ var }} and ${hiveconf:var} gets translated into jinja-type templating {{ var }}. @@ -56,7 +59,7 @@ class HiveOperator(BaseOperator): """ template_fields = ('hql', 'schema', 'hive_cli_conn_id', 'mapred_queue', - 'mapred_job_name', 'mapred_queue_priority') + 'hiveconfs', 'mapred_job_name', 'mapred_queue_priority') template_ext = ('.hql', '.sql',) ui_color = '#f0e4ec' @@ -65,6 +68,7 @@ class HiveOperator(BaseOperator): self, hql, hive_cli_conn_id='hive_cli_default', schema='default', + hiveconfs=None, hiveconf_jinja_translate=False, script_begin_tag=None, run_as_owner=False, @@ -74,15 +78,15 @@ class HiveOperator(BaseOperator): *args, **kwargs): super(HiveOperator, self).__init__(*args, **kwargs) - self.hiveconf_jinja_translate = hiveconf_jinja_translate self.hql = hql - self.schema = schema self.hive_cli_conn_id = hive_cli_conn_id + self.schema = schema + self.hiveconfs = hiveconfs or {} + self.hiveconf_jinja_translate = hiveconf_jinja_translate self.script_begin_tag = script_begin_tag self.run_as = None if run_as_owner: self.run_as = self.dag.owner - self.mapred_queue = mapred_queue self.mapred_queue_priority = mapred_queue_priority self.mapred_job_name = mapred_job_name @@ -119,8 +123,13 @@ class HiveOperator(BaseOperator): .format(ti.hostname.split('.')[0], ti.dag_id, ti.task_id, ti.execution_date.isoformat()) - self.hook.run_cli(hql=self.hql, schema=self.schema, - hive_conf=context_to_airflow_vars(context)) + if self.hiveconf_jinja_translate: + self.hiveconfs = context_to_airflow_vars(context) + else: + self.hiveconfs.update(context_to_airflow_vars(context)) + + self.log.info('Passing HiveConf: %s', self.hiveconfs) + self.hook.run_cli(hql=self.hql, schema=self.schema, hive_conf=self.hiveconfs) def dry_run(self): self.hook = self.get_hook() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/09bbe247/tests/operators/hive_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py index 9cc0f1a..0914ba9 100644 --- a/tests/operators/hive_operator.py +++ b/tests/operators/hive_operator.py @@ -98,6 +98,16 @@ class HiveOperatorTest(HiveEnvironmentTest): t.prepare_template() self.assertEqual(t.hql, "SELECT {{ num_col }} FROM {{ table }};") + def test_hiveconf(self): + hql = "SELECT * FROM ${hiveconf:table} PARTITION (${hiveconf:day});" + t = operators.hive_operator.HiveOperator( + hiveconfs={'table': 'static_babynames', 'day': '{{ ds }}'}, + task_id='dry_run_basic_hql', hql=hql, dag=self.dag) + t.prepare_template() + self.assertEqual( + t.hql, + "SELECT * FROM ${hiveconf:table} PARTITION (${hiveconf:day});") + if 'AIRFLOW_RUNALL_TESTS' in os.environ:
