Repository: incubator-airflow Updated Branches: refs/heads/master 5a3f39913 -> 216beacd5
[AIRFLOW-2648] Update mapred job name in HiveOperator Closes #3534 from yrqls21/keivn_yang_reorder_mapred Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/216beacd Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/216beacd Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/216beacd Branch: refs/heads/master Commit: 216beacd5b7695145a646611501006c770b26744 Parents: 5a3f399 Author: Kevin Yang <[email protected]> Authored: Mon Jun 25 13:31:34 2018 +0200 Committer: Fokko Driesprong <[email protected]> Committed: Mon Jun 25 13:31:34 2018 +0200 ---------------------------------------------------------------------- airflow/config_templates/default_airflow.cfg | 3 ++ airflow/config_templates/default_test.cfg | 1 + airflow/operators/hive_operator.py | 10 +++-- scripts/ci/airflow_travis.cfg | 4 +- tests/operators/hive_operator.py | 47 +++++++++++++++++------ 5 files changed, 49 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/216beacd/airflow/config_templates/default_airflow.cfg ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index fe99ece..592c9d7 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -211,6 +211,9 @@ default_gpus = 0 [hive] # Default mapreduce queue for HiveOperator tasks default_hive_mapred_queue = +# Template for mapred_job_name in HiveOperator, supports the following named parameters: +# hostname, dag_id, task_id, execution_date +mapred_job_name_template = Airflow HiveOperator task for {{hostname}}.{{dag_id}}.{{task_id}}.{{execution_date}} [webserver] # The base url of your website as airflow cannot guess what domain or http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/216beacd/airflow/config_templates/default_test.cfg ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg index cd4bd32..01696c6 100644 --- a/airflow/config_templates/default_test.cfg +++ b/airflow/config_templates/default_test.cfg @@ -64,6 +64,7 @@ default_owner = airflow [hive] default_hive_mapred_queue = airflow +mapred_job_name_template = Airflow HiveOperator task for {{hostname}}.{{dag_id}}.{{task_id}}.{{execution_date}} [webserver] base_url = http://localhost:8080 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/216beacd/airflow/operators/hive_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/hive_operator.py b/airflow/operators/hive_operator.py index bd72703..6b06fd4 100644 --- a/airflow/operators/hive_operator.py +++ b/airflow/operators/hive_operator.py @@ -22,6 +22,7 @@ from __future__ import unicode_literals import re from airflow.hooks.hive_hooks import HiveCliHook +from airflow import configuration from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.utils.operator_helpers import context_to_airflow_vars @@ -92,6 +93,8 @@ class HiveOperator(BaseOperator): self.mapred_queue = mapred_queue self.mapred_queue_priority = mapred_queue_priority self.mapred_job_name = mapred_job_name + self.mapred_job_name_template = configuration.get('hive', + 'mapred_job_name_template') # assigned lazily - just for consistency we can create the attribute with a # `None` initial value, later it will be populated by the execute method. @@ -121,9 +124,10 @@ class HiveOperator(BaseOperator): # set the mapred_job_name if it's not set with dag, task, execution time info if not self.mapred_job_name: ti = context['ti'] - self.hook.mapred_job_name = 'Airflow HiveOperator task for {}.{}.{}.{}'\ - .format(ti.hostname.split('.')[0], ti.dag_id, ti.task_id, - ti.execution_date.isoformat()) + self.hook.mapred_job_name = self.mapred_job_name_template\ + .format(dag_id=ti.dag_id, task_id=ti.task_id, + execution_date=ti.execution_date.isoformat(), + hostname=ti.hostname.split('.')[0]) if self.hiveconf_jinja_translate: self.hiveconfs = context_to_airflow_vars(context) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/216beacd/scripts/ci/airflow_travis.cfg ---------------------------------------------------------------------- diff --git a/scripts/ci/airflow_travis.cfg b/scripts/ci/airflow_travis.cfg index 140ecab..552e836 100644 --- a/scripts/ci/airflow_travis.cfg +++ b/scripts/ci/airflow_travis.cfg @@ -6,9 +6,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/216beacd/tests/operators/hive_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py index 1569e7f..7d8614c 100644 --- a/tests/operators/hive_operator.py +++ b/tests/operators/hive_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,11 +22,15 @@ from __future__ import print_function import datetime import os import unittest + import mock import nose -import six from airflow import DAG, configuration, operators +from airflow.models import TaskInstance +from airflow.operators.hive_operator import HiveOperator +from airflow.utils import timezone + configuration.load_test_config() @@ -61,7 +65,7 @@ class HiveEnvironmentTest(unittest.TestCase): class HiveOperatorConfigTest(HiveEnvironmentTest): def test_hive_airflow_default_config_queue(self): - t = operators.hive_operator.HiveOperator( + t = HiveOperator( task_id='test_default_config_queue', hql=self.hql, mapred_queue_priority='HIGH', @@ -77,7 +81,7 @@ class HiveOperatorConfigTest(HiveEnvironmentTest): def test_hive_airflow_default_config_queue_override(self): specific_mapred_queue = 'default' - t = operators.hive_operator.HiveOperator( + t = HiveOperator( task_id='test_default_config_queue', hql=self.hql, mapred_queue=specific_mapred_queue, @@ -92,7 +96,7 @@ class HiveOperatorTest(HiveEnvironmentTest): def test_hiveconf_jinja_translate(self): hql = "SELECT ${num_col} FROM ${hiveconf:table};" - t = operators.hive_operator.HiveOperator( + t = HiveOperator( hiveconf_jinja_translate=True, task_id='dry_run_basic_hql', hql=hql, dag=self.dag) t.prepare_template() @@ -100,7 +104,7 @@ class HiveOperatorTest(HiveEnvironmentTest): def test_hiveconf(self): hql = "SELECT * FROM ${hiveconf:table} PARTITION (${hiveconf:day});" - t = operators.hive_operator.HiveOperator( + t = HiveOperator( hiveconfs={'table': 'static_babynames', 'day': '{{ ds }}'}, task_id='dry_run_basic_hql', hql=hql, dag=self.dag) t.prepare_template() @@ -108,6 +112,27 @@ class HiveOperatorTest(HiveEnvironmentTest): t.hql, "SELECT * FROM ${hiveconf:table} PARTITION (${hiveconf:day});") + @mock.patch('airflow.operators.hive_operator.HiveOperator.get_hook') + def test_mapred_job_name(self, mock_get_hook): + mock_hook = mock.MagicMock() + mock_get_hook.return_value = mock_hook + t = HiveOperator( + task_id='test_mapred_job_name', + hql=self.hql, + dag=self.dag) + + fake_execution_date = timezone.datetime(2018, 6, 19) + fake_ti = TaskInstance(task=t, execution_date=fake_execution_date) + fake_ti.hostname = 'fake_hostname' + fake_context = {'ti': fake_ti} + + t.execute(fake_context) + self.assertEqual( + "Airflow HiveOperator task for {}.{}.{}.{}" + .format(fake_ti.hostname, + self.dag.dag_id, t.task_id, + fake_execution_date.isoformat()), mock_hook.mapred_job_name) + if 'AIRFLOW_RUNALL_TESTS' in os.environ: @@ -117,13 +142,13 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: class HivePrestoTest(HiveEnvironmentTest): def test_hive(self): - t = operators.hive_operator.HiveOperator( + t = HiveOperator( task_id='basic_hql', hql=self.hql, dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) def test_hive_queues(self): - t = operators.hive_operator.HiveOperator( + t = HiveOperator( task_id='test_hive_queues', hql=self.hql, mapred_queue='default', mapred_queue_priority='HIGH', mapred_job_name='airflow.test_hive_queues', @@ -132,12 +157,12 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: ignore_ti_state=True) def test_hive_dryrun(self): - t = operators.hive_operator.HiveOperator( + t = HiveOperator( task_id='dry_run_basic_hql', hql=self.hql, dag=self.dag) t.dry_run() def test_beeline(self): - t = operators.hive_operator.HiveOperator( + t = HiveOperator( task_id='beeline_hql', hive_cli_conn_id='beeline_default', hql=self.hql, dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
