Repository: incubator-airflow
Updated Branches:
  refs/heads/master 5a3f39913 -> 216beacd5


[AIRFLOW-2648] Update mapred job name in HiveOperator

Closes #3534 from
yrqls21/keivn_yang_reorder_mapred


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/216beacd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/216beacd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/216beacd

Branch: refs/heads/master
Commit: 216beacd5b7695145a646611501006c770b26744
Parents: 5a3f399
Author: Kevin Yang <[email protected]>
Authored: Mon Jun 25 13:31:34 2018 +0200
Committer: Fokko Driesprong <[email protected]>
Committed: Mon Jun 25 13:31:34 2018 +0200

----------------------------------------------------------------------
 airflow/config_templates/default_airflow.cfg |  3 ++
 airflow/config_templates/default_test.cfg    |  1 +
 airflow/operators/hive_operator.py           | 10 +++--
 scripts/ci/airflow_travis.cfg                |  4 +-
 tests/operators/hive_operator.py             | 47 +++++++++++++++++------
 5 files changed, 49 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/216beacd/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index fe99ece..592c9d7 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -211,6 +211,9 @@ default_gpus = 0
 [hive]
 # Default mapreduce queue for HiveOperator tasks
 default_hive_mapred_queue =
+# Template for mapred_job_name in HiveOperator, supports the following named 
parameters:
+# hostname, dag_id, task_id, execution_date
+mapred_job_name_template = Airflow HiveOperator task for 
{{hostname}}.{{dag_id}}.{{task_id}}.{{execution_date}}
 
 [webserver]
 # The base url of your website as airflow cannot guess what domain or

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/216beacd/airflow/config_templates/default_test.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_test.cfg 
b/airflow/config_templates/default_test.cfg
index cd4bd32..01696c6 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -64,6 +64,7 @@ default_owner = airflow
 
 [hive]
 default_hive_mapred_queue = airflow
+mapred_job_name_template = Airflow HiveOperator task for 
{{hostname}}.{{dag_id}}.{{task_id}}.{{execution_date}}
 
 [webserver]
 base_url = http://localhost:8080

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/216beacd/airflow/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_operator.py 
b/airflow/operators/hive_operator.py
index bd72703..6b06fd4 100644
--- a/airflow/operators/hive_operator.py
+++ b/airflow/operators/hive_operator.py
@@ -22,6 +22,7 @@ from __future__ import unicode_literals
 import re
 
 from airflow.hooks.hive_hooks import HiveCliHook
+from airflow import configuration
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.operator_helpers import context_to_airflow_vars
@@ -92,6 +93,8 @@ class HiveOperator(BaseOperator):
         self.mapred_queue = mapred_queue
         self.mapred_queue_priority = mapred_queue_priority
         self.mapred_job_name = mapred_job_name
+        self.mapred_job_name_template = configuration.get('hive',
+                                                          
'mapred_job_name_template')
 
         # assigned lazily - just for consistency we can create the attribute 
with a
         # `None` initial value, later it will be populated by the execute 
method.
@@ -121,9 +124,10 @@ class HiveOperator(BaseOperator):
         # set the mapred_job_name if it's not set with dag, task, execution 
time info
         if not self.mapred_job_name:
             ti = context['ti']
-            self.hook.mapred_job_name = 'Airflow HiveOperator task for 
{}.{}.{}.{}'\
-                .format(ti.hostname.split('.')[0], ti.dag_id, ti.task_id,
-                        ti.execution_date.isoformat())
+            self.hook.mapred_job_name = self.mapred_job_name_template\
+                .format(dag_id=ti.dag_id, task_id=ti.task_id,
+                        execution_date=ti.execution_date.isoformat(),
+                        hostname=ti.hostname.split('.')[0])
 
         if self.hiveconf_jinja_translate:
             self.hiveconfs = context_to_airflow_vars(context)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/216beacd/scripts/ci/airflow_travis.cfg
----------------------------------------------------------------------
diff --git a/scripts/ci/airflow_travis.cfg b/scripts/ci/airflow_travis.cfg
index 140ecab..552e836 100644
--- a/scripts/ci/airflow_travis.cfg
+++ b/scripts/ci/airflow_travis.cfg
@@ -6,9 +6,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/216beacd/tests/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py
index 1569e7f..7d8614c 100644
--- a/tests/operators/hive_operator.py
+++ b/tests/operators/hive_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,11 +22,15 @@ from __future__ import print_function
 import datetime
 import os
 import unittest
+
 import mock
 import nose
-import six
 
 from airflow import DAG, configuration, operators
+from airflow.models import TaskInstance
+from airflow.operators.hive_operator import HiveOperator
+from airflow.utils import timezone
+
 configuration.load_test_config()
 
 
@@ -61,7 +65,7 @@ class HiveEnvironmentTest(unittest.TestCase):
 class HiveOperatorConfigTest(HiveEnvironmentTest):
 
     def test_hive_airflow_default_config_queue(self):
-        t = operators.hive_operator.HiveOperator(
+        t = HiveOperator(
             task_id='test_default_config_queue',
             hql=self.hql,
             mapred_queue_priority='HIGH',
@@ -77,7 +81,7 @@ class HiveOperatorConfigTest(HiveEnvironmentTest):
 
     def test_hive_airflow_default_config_queue_override(self):
         specific_mapred_queue = 'default'
-        t = operators.hive_operator.HiveOperator(
+        t = HiveOperator(
             task_id='test_default_config_queue',
             hql=self.hql,
             mapred_queue=specific_mapred_queue,
@@ -92,7 +96,7 @@ class HiveOperatorTest(HiveEnvironmentTest):
 
     def test_hiveconf_jinja_translate(self):
         hql = "SELECT ${num_col} FROM ${hiveconf:table};"
-        t = operators.hive_operator.HiveOperator(
+        t = HiveOperator(
             hiveconf_jinja_translate=True,
             task_id='dry_run_basic_hql', hql=hql, dag=self.dag)
         t.prepare_template()
@@ -100,7 +104,7 @@ class HiveOperatorTest(HiveEnvironmentTest):
 
     def test_hiveconf(self):
         hql = "SELECT * FROM ${hiveconf:table} PARTITION (${hiveconf:day});"
-        t = operators.hive_operator.HiveOperator(
+        t = HiveOperator(
             hiveconfs={'table': 'static_babynames', 'day': '{{ ds }}'},
             task_id='dry_run_basic_hql', hql=hql, dag=self.dag)
         t.prepare_template()
@@ -108,6 +112,27 @@ class HiveOperatorTest(HiveEnvironmentTest):
             t.hql,
             "SELECT * FROM ${hiveconf:table} PARTITION (${hiveconf:day});")
 
+    @mock.patch('airflow.operators.hive_operator.HiveOperator.get_hook')
+    def test_mapred_job_name(self, mock_get_hook):
+        mock_hook = mock.MagicMock()
+        mock_get_hook.return_value = mock_hook
+        t = HiveOperator(
+            task_id='test_mapred_job_name',
+            hql=self.hql,
+            dag=self.dag)
+
+        fake_execution_date = timezone.datetime(2018, 6, 19)
+        fake_ti = TaskInstance(task=t, execution_date=fake_execution_date)
+        fake_ti.hostname = 'fake_hostname'
+        fake_context = {'ti': fake_ti}
+
+        t.execute(fake_context)
+        self.assertEqual(
+            "Airflow HiveOperator task for {}.{}.{}.{}"
+            .format(fake_ti.hostname,
+                    self.dag.dag_id, t.task_id,
+                    fake_execution_date.isoformat()), 
mock_hook.mapred_job_name)
+
 
 if 'AIRFLOW_RUNALL_TESTS' in os.environ:
 
@@ -117,13 +142,13 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
     class HivePrestoTest(HiveEnvironmentTest):
 
         def test_hive(self):
-            t = operators.hive_operator.HiveOperator(
+            t = HiveOperator(
                 task_id='basic_hql', hql=self.hql, dag=self.dag)
             t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
                   ignore_ti_state=True)
 
         def test_hive_queues(self):
-            t = operators.hive_operator.HiveOperator(
+            t = HiveOperator(
                 task_id='test_hive_queues', hql=self.hql,
                 mapred_queue='default', mapred_queue_priority='HIGH',
                 mapred_job_name='airflow.test_hive_queues',
@@ -132,12 +157,12 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                   ignore_ti_state=True)
 
         def test_hive_dryrun(self):
-            t = operators.hive_operator.HiveOperator(
+            t = HiveOperator(
                 task_id='dry_run_basic_hql', hql=self.hql, dag=self.dag)
             t.dry_run()
 
         def test_beeline(self):
-            t = operators.hive_operator.HiveOperator(
+            t = HiveOperator(
                 task_id='beeline_hql', hive_cli_conn_id='beeline_default',
                 hql=self.hql, dag=self.dag)
             t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,

Reply via email to