Repository: incubator-airflow
Updated Branches:
  refs/heads/master ef8a6ca4e -> 08a18395e


[AIRFLOW-2586] Stop getting AIRFLOW_HOME value from config file in bash operator

Closes #3484 from
yrqls21/kevin_yang_fix_bash_operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/08a18395
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/08a18395
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/08a18395

Branch: refs/heads/master
Commit: 08a18395e71d9f0f2353b23c0e0112c0ed81703a
Parents: ef8a6ca
Author: Kevin Yang <[email protected]>
Authored: Fri Jun 15 13:32:04 2018 +0200
Committer: Fokko Driesprong <[email protected]>
Committed: Fri Jun 15 13:32:04 2018 +0200

----------------------------------------------------------------------
 airflow/operators/bash_operator.py | 22 ++--------
 airflow/utils/dates.py             | 32 +++++++--------
 tests/operators/bash_operator.py   | 72 +++++++++++++++++++++++++++++++++
 3 files changed, 91 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/08a18395/airflow/operators/bash_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/bash_operator.py 
b/airflow/operators/bash_operator.py
index 53a68a7..37a19db 100644
--- a/airflow/operators/bash_operator.py
+++ b/airflow/operators/bash_operator.py
@@ -24,22 +24,12 @@ import signal
 from subprocess import Popen, STDOUT, PIPE
 from tempfile import gettempdir, NamedTemporaryFile
 
-from airflow import configuration as conf
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.file import TemporaryDirectory
 
 
-# These variables are required in cases when BashOperator tasks use airflow 
specific code,
-# e.g. they import packages in the airflow context and the possibility of 
impersonation
-# gives not guarantee that these variables are available in the impersonated 
environment.
-# Hence, we need to propagate them in the Bash script used as a wrapper of 
commands in
-# this BashOperator.
-PYTHONPATH_VAR = 'PYTHONPATH'
-AIRFLOW_HOME_VAR = 'AIRFLOW_HOME'
-
-
 class BashOperator(BaseOperator):
     """
     Execute a Bash script, command or set of commands.
@@ -83,18 +73,12 @@ class BashOperator(BaseOperator):
         """
         self.log.info("Tmp dir root location: \n %s", gettempdir())
 
-        airflow_home_value = conf.get('core', AIRFLOW_HOME_VAR)
-        pythonpath_value = os.environ.get(PYTHONPATH_VAR, '')
-
-        bash_command = ('export {}={}; '.format(AIRFLOW_HOME_VAR, 
airflow_home_value) +
-                        'export {}={}; '.format(PYTHONPATH_VAR, 
pythonpath_value) +
-                        self.bash_command)
-        self.lineage_data = bash_command
+        self.lineage_data = self.bash_command
 
         with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
             with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:
 
-                f.write(bytes(bash_command, 'utf_8'))
+                f.write(bytes(self.bash_command, 'utf_8'))
                 f.flush()
                 fname = f.name
                 script_location = os.path.abspath(fname)
@@ -110,7 +94,7 @@ class BashOperator(BaseOperator):
                             signal.signal(getattr(signal, sig), signal.SIG_DFL)
                     os.setsid()
 
-                self.log.info("Running command: %s", bash_command)
+                self.log.info("Running command: %s", self.bash_command)
                 sp = Popen(
                     ['bash', fname],
                     stdout=PIPE, stderr=STDOUT,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/08a18395/airflow/utils/dates.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py
index 090bef9..c147a65 100644
--- a/airflow/utils/dates.py
+++ b/airflow/utils/dates.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -48,7 +48,6 @@ def date_range(
     Get a set of dates as a list based on a start, end and delta, delta
     can be something that can be added to ``datetime.datetime``
     or a cron expression as a ``str``
-
     :param start_date: anchor date to start the series from
     :type start_date: datetime.datetime
     :param end_date: right boundary for the date range
@@ -57,13 +56,15 @@ def date_range(
         number of entries you want in the range. This number can be negative,
         output will always be sorted regardless
     :type num: int
-
     >>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), 
delta=timedelta(1))
-    [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0), 
datetime.datetime(2016, 1, 3, 0, 0)]
+    [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0),
+     datetime.datetime(2016, 1, 3, 0, 0)]
     >>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta='0 0 * * 
*')
-    [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0), 
datetime.datetime(2016, 1, 3, 0, 0)]
+    [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0),
+     datetime.datetime(2016, 1, 3, 0, 0)]
     >>> date_range(datetime(2016, 1, 1), datetime(2016, 3, 3), delta="0 0 0 * 
*")
-    [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 2, 1, 0, 0), 
datetime.datetime(2016, 3, 1, 0, 0)]
+    [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 2, 1, 0, 0),
+     datetime.datetime(2016, 3, 1, 0, 0)]
     """
     if not delta:
         return []
@@ -82,13 +83,15 @@ def date_range(
         cron = croniter(delta, start_date)
     elif isinstance(delta, timedelta):
         delta = abs(delta)
-    l = []
+    dates = []
     if end_date:
+        if timezone.is_naive(start_date):
+            end_date = timezone.make_naive(end_date, tz)
         while start_date <= end_date:
             if timezone.is_naive(start_date):
-                l.append(timezone.make_aware(start_date, tz))
+                dates.append(timezone.make_aware(start_date, tz))
             else:
-                l.append(start_date)
+                dates.append(start_date)
 
             if delta_iscron:
                 start_date = cron.get_next(datetime)
@@ -97,9 +100,9 @@ def date_range(
     else:
         for _ in range(abs(num)):
             if timezone.is_naive(start_date):
-                l.append(timezone.make_aware(start_date, tz))
+                dates.append(timezone.make_aware(start_date, tz))
             else:
-                l.append(start_date)
+                dates.append(start_date)
 
             if delta_iscron:
                 if num > 0:
@@ -111,16 +114,14 @@ def date_range(
                     start_date += delta
                 else:
                     start_date -= delta
-    return sorted(l)
+    return sorted(dates)
 
 
 def round_time(dt, delta, start_date=timezone.make_aware(datetime.min)):
     """
     Returns the datetime of the form start_date + i * delta
     which is closest to dt for any non-negative integer i.
-
     Note that delta may be a datetime.timedelta or a dateutil.relativedelta
-
     >>> round_time(datetime(2015, 1, 1, 6), timedelta(days=1))
     datetime.datetime(2015, 1, 1, 0, 0)
     >>> round_time(datetime(2015, 1, 2), relativedelta(months=1))
@@ -203,7 +204,6 @@ def infer_time_unit(time_seconds_arr):
     """
     Determine the most appropriate time unit for an array of time durations
     specified in seconds.
-
     e.g. 5400 seconds => 'minutes', 36000 seconds => 'hours'
     """
     if len(time_seconds_arr) == 0:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/08a18395/tests/operators/bash_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/bash_operator.py b/tests/operators/bash_operator.py
new file mode 100644
index 0000000..1ce77e9
--- /dev/null
+++ b/tests/operators/bash_operator.py
@@ -0,0 +1,72 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import os
+from datetime import datetime, timedelta
+
+from airflow import DAG
+from airflow.models import State
+from airflow.operators.bash_operator import BashOperator
+from airflow.utils import timezone
+
+DEFAULT_DATE = datetime(2016, 1, 1, tzinfo=timezone.utc)
+END_DATE = datetime(2016, 1, 2, tzinfo=timezone.utc)
+INTERVAL = timedelta(hours=12)
+
+
+class BashOperatorTestCase(unittest.TestCase):
+    def test_echo_env_variables(self):
+        """
+        Test that env variables are exported correctly to the
+        task bash environment.
+        """
+        now = datetime.utcnow()
+        now = now.replace(tzinfo=timezone.utc)
+
+        self.dag = DAG(
+            dag_id='bash_op_test', default_args={
+                'owner': 'airflow',
+                'retries': 100,
+                'start_date': DEFAULT_DATE
+            },
+            schedule_interval='@daily',
+            dagrun_timeout=timedelta(minutes=60))
+
+        self.dag.create_dagrun(
+            run_id='manual__' + DEFAULT_DATE.isoformat(),
+            execution_date=DEFAULT_DATE,
+            start_date=now,
+            state=State.RUNNING,
+            external_trigger=False,
+        )
+
+        import tempfile
+        with tempfile.NamedTemporaryFile() as f:
+            fname = f.name
+            t = BashOperator(
+                task_id='echo_env_vars',
+                dag=self.dag,
+                bash_command='echo $AIRFLOW_HOME>> {0};'
+                             'echo $PYTHONPATH>> {0};'.format(fname)
+            )
+            os.environ['AIRFLOW_HOME'] = 'MY_PATH_TO_AIRFLOW_HOME'
+            t.run(DEFAULT_DATE, DEFAULT_DATE,
+                  ignore_first_depends_on_past=True, ignore_ti_state=True)
+
+            with open(fname, 'r') as fr:
+                output = ''.join(fr.readlines())
+                self.assertIn('MY_PATH_TO_AIRFLOW_HOME', output)
+                # exported in run_unit_tests.sh as part of PYTHONPATH
+                self.assertIn('tests/test_utils', output)

Reply via email to