Repository: incubator-airflow Updated Branches: refs/heads/master 07c2a515e -> b3489b99e
[AIRFLOW-1963] Add config for HiveOperator mapred_queue Adding configuration setting for specifying a default mapred_queue for hive jobs using the HiveOperator. Closes #2915 from edgarRd/erod-hive-mapred-queue- config Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b3489b99 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b3489b99 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b3489b99 Branch: refs/heads/master Commit: b3489b99e9140e25bdd08b78f57ba845c3edb358 Parents: 07c2a51 Author: Edgar Rodriguez <[email protected]> Authored: Wed Jan 3 14:23:09 2018 -0800 Committer: Dan Davydov <[email protected]> Committed: Wed Jan 3 14:23:14 2018 -0800 ---------------------------------------------------------------------- airflow/config_templates/default_airflow.cfg | 4 +- airflow/config_templates/default_test.cfg | 3 + airflow/hooks/hive_hooks.py | 4 +- airflow/operators/hive_operator.py | 19 ++++-- scripts/ci/airflow_travis.cfg | 3 + tests/operators/hive_operator.py | 82 ++++++++++++++--------- 6 files changed, 77 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/airflow/config_templates/default_airflow.cfg ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index a7a3b7d..d0dfb72 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -170,6 +170,9 @@ default_ram = 512 default_disk = 512 default_gpus = 0 +[hive] +# Default mapreduce queue for HiveOperator tasks +default_hive_mapred_queue = [webserver] # The base url of your website as airflow cannot guess what domain or @@ -458,7 +461,6 @@ keytab = airflow.keytab [github_enterprise] api_rev = v3 - [admin] # UI to hide sensitive variable fields when set to True hide_sensitive_variable_fields = True http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/airflow/config_templates/default_test.cfg ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg index 85343ee..eaf3d03 100644 --- a/airflow/config_templates/default_test.cfg +++ b/airflow/config_templates/default_test.cfg @@ -51,6 +51,9 @@ auth_backend = airflow.api.auth.backend.default [operators] default_owner = airflow +[hive] +default_hive_mapred_queue = airflow + [webserver] base_url = http://localhost:8080 web_server_host = 0.0.0.0 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/airflow/hooks/hive_hooks.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py index eb39469..3d986fd 100644 --- a/airflow/hooks/hive_hooks.py +++ b/airflow/hooks/hive_hooks.py @@ -25,6 +25,7 @@ import time from tempfile import NamedTemporaryFile import hive_metastore +from airflow import configuration as conf from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook from airflow.utils.helpers import as_flattened_list @@ -82,7 +83,8 @@ class HiveCliHook(BaseHook): "Invalid Mapred Queue Priority. Valid values are: " "{}".format(', '.join(HIVE_QUEUE_PRIORITIES))) - self.mapred_queue = mapred_queue + self.mapred_queue = mapred_queue or conf.get('hive', + 'default_hive_mapred_queue') self.mapred_queue_priority = mapred_queue_priority self.mapred_job_name = mapred_job_name http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/airflow/operators/hive_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/hive_operator.py b/airflow/operators/hive_operator.py index 221feeb..ce98544 100644 --- a/airflow/operators/hive_operator.py +++ b/airflow/operators/hive_operator.py @@ -77,13 +77,19 @@ class HiveOperator(BaseOperator): self.mapred_queue_priority = mapred_queue_priority self.mapred_job_name = mapred_job_name + # assigned lazily - just for consistency we can create the attribute with a + # `None` initial value, later it will be populated by the execute method. + # This also makes `on_kill` implementation consistent since it assumes `self.hook` + # is defined. + self.hook = None + def get_hook(self): return HiveCliHook( - hive_cli_conn_id=self.hive_cli_conn_id, - run_as=self.run_as, - mapred_queue=self.mapred_queue, - mapred_queue_priority=self.mapred_queue_priority, - mapred_job_name=self.mapred_job_name) + hive_cli_conn_id=self.hive_cli_conn_id, + run_as=self.run_as, + mapred_queue=self.mapred_queue, + mapred_queue_priority=self.mapred_queue_priority, + mapred_job_name=self.mapred_job_name) def prepare_template(self): if self.hiveconf_jinja_translate: @@ -103,4 +109,5 @@ class HiveOperator(BaseOperator): self.hook.test_hql(hql=self.hql) def on_kill(self): - self.hook.kill() + if self.hook: + self.hook.kill() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/scripts/ci/airflow_travis.cfg ---------------------------------------------------------------------- diff --git a/scripts/ci/airflow_travis.cfg b/scripts/ci/airflow_travis.cfg index c1ced74..03d7e59 100644 --- a/scripts/ci/airflow_travis.cfg +++ b/scripts/ci/airflow_travis.cfg @@ -31,6 +31,9 @@ base_url = http://localhost:8080 web_server_host = 0.0.0.0 web_server_port = 8080 +[hive] +default_hive_mapred_queue = airflow + [email] email_backend = airflow.utils.send_email_smtp http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/tests/operators/hive_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py index 69166fd..279c7ba 100644 --- a/tests/operators/hive_operator.py +++ b/tests/operators/hive_operator.py @@ -30,6 +30,57 @@ DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat() DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10] +class HiveEnvironmentTest(unittest.TestCase): + + def setUp(self): + configuration.load_test_config() + args = {'owner': 'airflow', 'start_date': DEFAULT_DATE} + dag = DAG('test_dag_id', default_args=args) + self.dag = dag + self.hql = """ + USE airflow; + DROP TABLE IF EXISTS static_babynames_partitioned; + CREATE TABLE IF NOT EXISTS static_babynames_partitioned ( + state string, + year string, + name string, + gender string, + num int) + PARTITIONED BY (ds string); + INSERT OVERWRITE TABLE static_babynames_partitioned + PARTITION(ds='{{ ds }}') + SELECT state, year, name, gender, num FROM static_babynames; + """ + + +class HiveOperatorConfigTest(HiveEnvironmentTest): + + def test_hive_airflow_default_config_queue(self): + t = operators.hive_operator.HiveOperator( + task_id='test_default_config_queue', + hql=self.hql, + mapred_queue_priority='HIGH', + mapred_job_name='airflow.test_default_config_queue', + dag=self.dag) + + # just check that the correct default value in test_default.cfg is used + test_config_hive_mapred_queue = configuration.get('hive', + 'default_hive_mapred_queue') + self.assertEqual(t.get_hook().mapred_queue, test_config_hive_mapred_queue) + + def test_hive_airflow_default_config_queue_override(self): + specific_mapred_queue = 'default' + t = operators.hive_operator.HiveOperator( + task_id='test_default_config_queue', + hql=self.hql, + mapred_queue=specific_mapred_queue, + mapred_queue_priority='HIGH', + mapred_job_name='airflow.test_default_config_queue', + dag=self.dag) + + self.assertEqual(t.get_hook().mapred_queue, specific_mapred_queue) + + if 'AIRFLOW_RUNALL_TESTS' in os.environ: import airflow.hooks.hive_hooks @@ -148,37 +199,15 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: self.assertEqual(sql, args[0]) self.assertEqual(self.nondefault_schema, kwargs['schema']) - class HivePrestoTest(unittest.TestCase): - - def setUp(self): - configuration.load_test_config() - args = {'owner': 'airflow', 'start_date': DEFAULT_DATE} - dag = DAG('test_dag_id', default_args=args) - self.dag = dag - self.hql = """ - USE airflow; - DROP TABLE IF EXISTS static_babynames_partitioned; - CREATE TABLE IF NOT EXISTS static_babynames_partitioned ( - state string, - year string, - name string, - gender string, - num int) - PARTITIONED BY (ds string); - INSERT OVERWRITE TABLE static_babynames_partitioned - PARTITION(ds='{{ ds }}') - SELECT state, year, name, gender, num FROM static_babynames; - """ + class HivePrestoTest(HiveEnvironmentTest): def test_hive(self): - import airflow.operators.hive_operator t = operators.hive_operator.HiveOperator( task_id='basic_hql', hql=self.hql, dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) def test_hive_queues(self): - import airflow.operators.hive_operator t = operators.hive_operator.HiveOperator( task_id='test_hive_queues', hql=self.hql, mapred_queue='default', mapred_queue_priority='HIGH', @@ -188,13 +217,11 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: ignore_ti_state=True) def test_hive_dryrun(self): - import airflow.operators.hive_operator t = operators.hive_operator.HiveOperator( task_id='dry_run_basic_hql', hql=self.hql, dag=self.dag) t.dry_run() def test_beeline(self): - import airflow.operators.hive_operator t = operators.hive_operator.HiveOperator( task_id='beeline_hql', hive_cli_conn_id='beeline_default', hql=self.hql, dag=self.dag) @@ -205,14 +232,12 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: sql = """ SELECT count(1) FROM airflow.static_babynames_partitioned; """ - import airflow.operators.presto_check_operator t = operators.presto_check_operator.PrestoCheckOperator( task_id='presto_check', sql=sql, dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) def test_presto_to_mysql(self): - import airflow.operators.presto_to_mysql t = operators.presto_to_mysql.PrestoToMySqlTransfer( task_id='presto_to_mysql_check', sql=""" @@ -253,7 +278,6 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: ignore_ti_state=True) def test_hive_stats(self): - import airflow.operators.hive_stats_operator t = operators.hive_stats_operator.HiveStatsCollectionOperator( task_id='hive_stats_check', table="airflow.static_babynames_partitioned", @@ -322,7 +346,6 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: ignore_ti_state=True) def test_hive2samba(self): - import airflow.operators.hive_to_samba_operator t = operators.hive_to_samba_operator.Hive2SambaOperator( task_id='hive2samba_check', samba_conn_id='tableau_samba', @@ -333,7 +356,6 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: ignore_ti_state=True) def test_hive_to_mysql(self): - import airflow.operators.hive_to_mysql t = operators.hive_to_mysql.HiveToMySqlTransfer( mysql_conn_id='airflow_db', task_id='hive_to_mysql_check',
