Repository: incubator-airflow Updated Branches: refs/heads/master ef8a6ca4e -> 08a18395e
[AIRFLOW-2586] Stop getting AIRFLOW_HOME value from config file in bash operator Closes #3484 from yrqls21/kevin_yang_fix_bash_operator Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/08a18395 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/08a18395 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/08a18395 Branch: refs/heads/master Commit: 08a18395e71d9f0f2353b23c0e0112c0ed81703a Parents: ef8a6ca Author: Kevin Yang <[email protected]> Authored: Fri Jun 15 13:32:04 2018 +0200 Committer: Fokko Driesprong <[email protected]> Committed: Fri Jun 15 13:32:04 2018 +0200 ---------------------------------------------------------------------- airflow/operators/bash_operator.py | 22 ++-------- airflow/utils/dates.py | 32 +++++++-------- tests/operators/bash_operator.py | 72 +++++++++++++++++++++++++++++++++ 3 files changed, 91 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/08a18395/airflow/operators/bash_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py index 53a68a7..37a19db 100644 --- a/airflow/operators/bash_operator.py +++ b/airflow/operators/bash_operator.py @@ -24,22 +24,12 @@ import signal from subprocess import Popen, STDOUT, PIPE from tempfile import gettempdir, NamedTemporaryFile -from airflow import configuration as conf from airflow.exceptions import AirflowException from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.utils.file import TemporaryDirectory -# These variables are required in cases when BashOperator tasks use airflow specific code, -# e.g. they import packages in the airflow context and the possibility of impersonation -# gives not guarantee that these variables are available in the impersonated environment. -# Hence, we need to propagate them in the Bash script used as a wrapper of commands in -# this BashOperator. -PYTHONPATH_VAR = 'PYTHONPATH' -AIRFLOW_HOME_VAR = 'AIRFLOW_HOME' - - class BashOperator(BaseOperator): """ Execute a Bash script, command or set of commands. @@ -83,18 +73,12 @@ class BashOperator(BaseOperator): """ self.log.info("Tmp dir root location: \n %s", gettempdir()) - airflow_home_value = conf.get('core', AIRFLOW_HOME_VAR) - pythonpath_value = os.environ.get(PYTHONPATH_VAR, '') - - bash_command = ('export {}={}; '.format(AIRFLOW_HOME_VAR, airflow_home_value) + - 'export {}={}; '.format(PYTHONPATH_VAR, pythonpath_value) + - self.bash_command) - self.lineage_data = bash_command + self.lineage_data = self.bash_command with TemporaryDirectory(prefix='airflowtmp') as tmp_dir: with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f: - f.write(bytes(bash_command, 'utf_8')) + f.write(bytes(self.bash_command, 'utf_8')) f.flush() fname = f.name script_location = os.path.abspath(fname) @@ -110,7 +94,7 @@ class BashOperator(BaseOperator): signal.signal(getattr(signal, sig), signal.SIG_DFL) os.setsid() - self.log.info("Running command: %s", bash_command) + self.log.info("Running command: %s", self.bash_command) sp = Popen( ['bash', fname], stdout=PIPE, stderr=STDOUT, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/08a18395/airflow/utils/dates.py ---------------------------------------------------------------------- diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py index 090bef9..c147a65 100644 --- a/airflow/utils/dates.py +++ b/airflow/utils/dates.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -48,7 +48,6 @@ def date_range( Get a set of dates as a list based on a start, end and delta, delta can be something that can be added to ``datetime.datetime`` or a cron expression as a ``str`` - :param start_date: anchor date to start the series from :type start_date: datetime.datetime :param end_date: right boundary for the date range @@ -57,13 +56,15 @@ def date_range( number of entries you want in the range. This number can be negative, output will always be sorted regardless :type num: int - >>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta=timedelta(1)) - [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0), datetime.datetime(2016, 1, 3, 0, 0)] + [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0), + datetime.datetime(2016, 1, 3, 0, 0)] >>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta='0 0 * * *') - [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0), datetime.datetime(2016, 1, 3, 0, 0)] + [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0), + datetime.datetime(2016, 1, 3, 0, 0)] >>> date_range(datetime(2016, 1, 1), datetime(2016, 3, 3), delta="0 0 0 * *") - [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 2, 1, 0, 0), datetime.datetime(2016, 3, 1, 0, 0)] + [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 2, 1, 0, 0), + datetime.datetime(2016, 3, 1, 0, 0)] """ if not delta: return [] @@ -82,13 +83,15 @@ def date_range( cron = croniter(delta, start_date) elif isinstance(delta, timedelta): delta = abs(delta) - l = [] + dates = [] if end_date: + if timezone.is_naive(start_date): + end_date = timezone.make_naive(end_date, tz) while start_date <= end_date: if timezone.is_naive(start_date): - l.append(timezone.make_aware(start_date, tz)) + dates.append(timezone.make_aware(start_date, tz)) else: - l.append(start_date) + dates.append(start_date) if delta_iscron: start_date = cron.get_next(datetime) @@ -97,9 +100,9 @@ def date_range( else: for _ in range(abs(num)): if timezone.is_naive(start_date): - l.append(timezone.make_aware(start_date, tz)) + dates.append(timezone.make_aware(start_date, tz)) else: - l.append(start_date) + dates.append(start_date) if delta_iscron: if num > 0: @@ -111,16 +114,14 @@ def date_range( start_date += delta else: start_date -= delta - return sorted(l) + return sorted(dates) def round_time(dt, delta, start_date=timezone.make_aware(datetime.min)): """ Returns the datetime of the form start_date + i * delta which is closest to dt for any non-negative integer i. - Note that delta may be a datetime.timedelta or a dateutil.relativedelta - >>> round_time(datetime(2015, 1, 1, 6), timedelta(days=1)) datetime.datetime(2015, 1, 1, 0, 0) >>> round_time(datetime(2015, 1, 2), relativedelta(months=1)) @@ -203,7 +204,6 @@ def infer_time_unit(time_seconds_arr): """ Determine the most appropriate time unit for an array of time durations specified in seconds. - e.g. 5400 seconds => 'minutes', 36000 seconds => 'hours' """ if len(time_seconds_arr) == 0: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/08a18395/tests/operators/bash_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/bash_operator.py b/tests/operators/bash_operator.py new file mode 100644 index 0000000..1ce77e9 --- /dev/null +++ b/tests/operators/bash_operator.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import os +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.models import State +from airflow.operators.bash_operator import BashOperator +from airflow.utils import timezone + +DEFAULT_DATE = datetime(2016, 1, 1, tzinfo=timezone.utc) +END_DATE = datetime(2016, 1, 2, tzinfo=timezone.utc) +INTERVAL = timedelta(hours=12) + + +class BashOperatorTestCase(unittest.TestCase): + def test_echo_env_variables(self): + """ + Test that env variables are exported correctly to the + task bash environment. + """ + now = datetime.utcnow() + now = now.replace(tzinfo=timezone.utc) + + self.dag = DAG( + dag_id='bash_op_test', default_args={ + 'owner': 'airflow', + 'retries': 100, + 'start_date': DEFAULT_DATE + }, + schedule_interval='@daily', + dagrun_timeout=timedelta(minutes=60)) + + self.dag.create_dagrun( + run_id='manual__' + DEFAULT_DATE.isoformat(), + execution_date=DEFAULT_DATE, + start_date=now, + state=State.RUNNING, + external_trigger=False, + ) + + import tempfile + with tempfile.NamedTemporaryFile() as f: + fname = f.name + t = BashOperator( + task_id='echo_env_vars', + dag=self.dag, + bash_command='echo $AIRFLOW_HOME>> {0};' + 'echo $PYTHONPATH>> {0};'.format(fname) + ) + os.environ['AIRFLOW_HOME'] = 'MY_PATH_TO_AIRFLOW_HOME' + t.run(DEFAULT_DATE, DEFAULT_DATE, + ignore_first_depends_on_past=True, ignore_ti_state=True) + + with open(fname, 'r') as fr: + output = ''.join(fr.readlines()) + self.assertIn('MY_PATH_TO_AIRFLOW_HOME', output) + # exported in run_unit_tests.sh as part of PYTHONPATH + self.assertIn('tests/test_utils', output)
