Repository: incubator-airflow
Updated Branches:
  refs/heads/master e30a1f451 -> 09bbe2477


[AIRFLOW-1153] Allow HiveOperators to take hiveconfs

HiveOperator can only replace variables via jinja
and the replacements
are global to the dag through the context and
user_defined_macros.
It would be much more flexible to open up
hive_conf to the HiveOperator
level so hive scripts can be recycled at the task
level, leveraging
HiveHook already existing hive_conf param and
_prepare_hiveconf
function.

Closes #3136 from wolfier/AIRFLOW-1153


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/09bbe247
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/09bbe247
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/09bbe247

Branch: refs/heads/master
Commit: 09bbe247728993867c716635951219cc49f65dd1
Parents: e30a1f4
Author: Alan Ma <[email protected]>
Authored: Mon Apr 23 18:56:29 2018 +0200
Committer: Fokko Driesprong <[email protected]>
Committed: Mon Apr 23 18:56:29 2018 +0200

----------------------------------------------------------------------
 airflow/operators/hive_operator.py | 21 +++++++++++++++------
 tests/operators/hive_operator.py   | 10 ++++++++++
 2 files changed, 25 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/09bbe247/airflow/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_operator.py 
b/airflow/operators/hive_operator.py
index 64ea61d..b62744b 100644
--- a/airflow/operators/hive_operator.py
+++ b/airflow/operators/hive_operator.py
@@ -35,6 +35,9 @@ class HiveOperator(BaseOperator):
     :type hql: string
     :param hive_cli_conn_id: reference to the Hive database
     :type hive_cli_conn_id: string
+    :param hiveconfs: if defined, these key value pairs will be passed
+        to hive as ``-hiveconf "key"="value"``
+    :type hiveconfs: dict
     :param hiveconf_jinja_translate: when True, hiveconf-type templating
         ${var} gets translated into jinja-type templating {{ var }} and
         ${hiveconf:var} gets translated into jinja-type templating {{ var }}.
@@ -56,7 +59,7 @@ class HiveOperator(BaseOperator):
     """
 
     template_fields = ('hql', 'schema', 'hive_cli_conn_id', 'mapred_queue',
-                       'mapred_job_name', 'mapred_queue_priority')
+                       'hiveconfs', 'mapred_job_name', 'mapred_queue_priority')
     template_ext = ('.hql', '.sql',)
     ui_color = '#f0e4ec'
 
@@ -65,6 +68,7 @@ class HiveOperator(BaseOperator):
             self, hql,
             hive_cli_conn_id='hive_cli_default',
             schema='default',
+            hiveconfs=None,
             hiveconf_jinja_translate=False,
             script_begin_tag=None,
             run_as_owner=False,
@@ -74,15 +78,15 @@ class HiveOperator(BaseOperator):
             *args, **kwargs):
 
         super(HiveOperator, self).__init__(*args, **kwargs)
-        self.hiveconf_jinja_translate = hiveconf_jinja_translate
         self.hql = hql
-        self.schema = schema
         self.hive_cli_conn_id = hive_cli_conn_id
+        self.schema = schema
+        self.hiveconfs = hiveconfs or {}
+        self.hiveconf_jinja_translate = hiveconf_jinja_translate
         self.script_begin_tag = script_begin_tag
         self.run_as = None
         if run_as_owner:
             self.run_as = self.dag.owner
-
         self.mapred_queue = mapred_queue
         self.mapred_queue_priority = mapred_queue_priority
         self.mapred_job_name = mapred_job_name
@@ -119,8 +123,13 @@ class HiveOperator(BaseOperator):
                 .format(ti.hostname.split('.')[0], ti.dag_id, ti.task_id,
                         ti.execution_date.isoformat())
 
-        self.hook.run_cli(hql=self.hql, schema=self.schema,
-                          hive_conf=context_to_airflow_vars(context))
+        if self.hiveconf_jinja_translate:
+            self.hiveconfs = context_to_airflow_vars(context)
+        else:
+            self.hiveconfs.update(context_to_airflow_vars(context))
+
+        self.log.info('Passing HiveConf: %s', self.hiveconfs)
+        self.hook.run_cli(hql=self.hql, schema=self.schema, 
hive_conf=self.hiveconfs)
 
     def dry_run(self):
         self.hook = self.get_hook()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/09bbe247/tests/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py
index 9cc0f1a..0914ba9 100644
--- a/tests/operators/hive_operator.py
+++ b/tests/operators/hive_operator.py
@@ -98,6 +98,16 @@ class HiveOperatorTest(HiveEnvironmentTest):
         t.prepare_template()
         self.assertEqual(t.hql, "SELECT {{ num_col }} FROM {{ table }};")
 
+    def test_hiveconf(self):
+        hql = "SELECT * FROM ${hiveconf:table} PARTITION (${hiveconf:day});"
+        t = operators.hive_operator.HiveOperator(
+            hiveconfs={'table': 'static_babynames', 'day': '{{ ds }}'},
+            task_id='dry_run_basic_hql', hql=hql, dag=self.dag)
+        t.prepare_template()
+        self.assertEqual(
+            t.hql,
+            "SELECT * FROM ${hiveconf:table} PARTITION (${hiveconf:day});")
+
 
 if 'AIRFLOW_RUNALL_TESTS' in os.environ:
 

Reply via email to