This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d4a3d2b  Adding feature in bash operator to append the user defined 
env variable to system env variable (#18944)
d4a3d2b is described below

commit d4a3d2b1e7cf273caaf94463cbfcbcdb77bfc338
Author: PraveenA95 <[email protected]>
AuthorDate: Thu Oct 14 00:58:17 2021 +0530

    Adding feature in bash operator to append the user defined env variable to 
system env variable (#18944)
---
 airflow/operators/bash.py    | 14 +++++++++++++-
 tests/operators/test_bash.py | 29 +++++++++++++++++++----------
 2 files changed, 32 insertions(+), 11 deletions(-)

diff --git a/airflow/operators/bash.py b/airflow/operators/bash.py
index a551a82..0c96696 100644
--- a/airflow/operators/bash.py
+++ b/airflow/operators/bash.py
@@ -44,6 +44,11 @@ class BashOperator(BaseOperator):
         of inheriting the current process environment, which is the default
         behavior. (templated)
     :type env: dict
+    :param append_env: If False(default) uses the environment variables passed 
in env params
+        and does not inherit the current process environment. If True, 
inherits the environment variables
+        from current passes and then environment variable passed by the user 
will either update the existing
+        inherited environment variables or the new variables gets appended to 
it
+    :type append_env: bool
     :param output_encoding: Output encoding of bash command
     :type output_encoding: str
     :param skip_exit_code: If task exits with this exit code, leave the task
@@ -135,6 +140,7 @@ class BashOperator(BaseOperator):
         *,
         bash_command: str,
         env: Optional[Dict[str, str]] = None,
+        append_env: bool = False,
         output_encoding: str = 'utf-8',
         skip_exit_code: int = 99,
         cwd: str = None,
@@ -146,6 +152,7 @@ class BashOperator(BaseOperator):
         self.output_encoding = output_encoding
         self.skip_exit_code = skip_exit_code
         self.cwd = cwd
+        self.append_env = append_env
         if kwargs.get('xcom_push') is not None:
             raise AirflowException("'xcom_push' was deprecated, use 
'BaseOperator.do_xcom_push' instead")
 
@@ -156,9 +163,14 @@ class BashOperator(BaseOperator):
 
     def get_env(self, context):
         """Builds the set of environment variables to be exposed for the bash 
command"""
+        system_env = os.environ.copy()
         env = self.env
         if env is None:
-            env = os.environ.copy()
+            env = system_env
+        else:
+            if self.append_env:
+                system_env.update(env)
+                env = system_env
 
         airflow_context_vars = context_to_airflow_vars(context, 
in_env_var_format=True)
         self.log.debug(
diff --git a/tests/operators/test_bash.py b/tests/operators/test_bash.py
index 36e4744..412edfa 100644
--- a/tests/operators/test_bash.py
+++ b/tests/operators/test_bash.py
@@ -25,7 +25,6 @@ import pytest
 from parameterized import parameterized
 
 from airflow.exceptions import AirflowException, AirflowSkipException
-from airflow.models import DagRun
 from airflow.models.dag import DAG
 from airflow.operators.bash import BashOperator
 from airflow.utils import timezone
@@ -38,11 +37,26 @@ INTERVAL = timedelta(hours=12)
 
 
 class TestBashOperator(unittest.TestCase):
-    def test_echo_env_variables(self):
+    @parameterized.expand(
+        [
+            (False, None, 'MY_PATH_TO_AIRFLOW_HOME'),
+            (True, {'AIRFLOW_HOME': 'OVERRIDDEN_AIRFLOW_HOME'}, 
'OVERRIDDEN_AIRFLOW_HOME'),
+        ]
+    )
+    def test_echo_env_variables(self, append_env, user_defined_env, 
expected_airflow_home):
         """
         Test that env variables are exported correctly to the task bash 
environment.
         """
         utc_now = datetime.utcnow().replace(tzinfo=timezone.utc)
+        expected = (
+            f"{expected_airflow_home}\n"
+            "AWESOME_PYTHONPATH\n"
+            "bash_op_test\n"
+            "echo_env_vars\n"
+            f"{utc_now.isoformat()}\n"
+            f"manual__{utc_now.isoformat()}\n"
+        )
+
         dag = DAG(
             dag_id='bash_op_test',
             default_args={'owner': 'airflow', 'retries': 100, 'start_date': 
DEFAULT_DATE},
@@ -68,6 +82,8 @@ class TestBashOperator(unittest.TestCase):
                 'echo $AIRFLOW_CTX_TASK_ID>> {0};'
                 'echo $AIRFLOW_CTX_EXECUTION_DATE>> {0};'
                 'echo $AIRFLOW_CTX_DAG_RUN_ID>> {0};'.format(tmp_file.name),
+                append_env=append_env,
+                env=user_defined_env,
             )
 
             with mock.patch.dict(
@@ -77,13 +93,7 @@ class TestBashOperator(unittest.TestCase):
 
             with open(tmp_file.name) as file:
                 output = ''.join(file.readlines())
-                assert 'MY_PATH_TO_AIRFLOW_HOME' in output
-                # exported in run-tests as part of PYTHONPATH
-                assert 'AWESOME_PYTHONPATH' in output
-                assert 'bash_op_test' in output
-                assert 'echo_env_vars' in output
-                assert utc_now.isoformat() in output
-                assert DagRun.generate_run_id(DagRunType.MANUAL, utc_now) in 
output
+                assert expected == output
 
     @parameterized.expand(
         [
@@ -147,7 +157,6 @@ class TestBashOperator(unittest.TestCase):
                 BashOperator(task_id='abc', bash_command=test_cmd, 
cwd=tmp_file.name).execute({})
 
     def test_valid_cwd(self):
-
         test_cmd = 'set -e; echo "xxxx" |tee outputs.txt'
         with TemporaryDirectory(prefix='test_command_with_cwd') as 
test_cwd_folder:
             # Test everything went alright

Reply via email to