This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d4a3d2b Adding feature in bash operator to append the user defined
env variable to system env variable (#18944)
d4a3d2b is described below
commit d4a3d2b1e7cf273caaf94463cbfcbcdb77bfc338
Author: PraveenA95 <[email protected]>
AuthorDate: Thu Oct 14 00:58:17 2021 +0530
Adding feature in bash operator to append the user defined env variable to
system env variable (#18944)
---
airflow/operators/bash.py | 14 +++++++++++++-
tests/operators/test_bash.py | 29 +++++++++++++++++++----------
2 files changed, 32 insertions(+), 11 deletions(-)
diff --git a/airflow/operators/bash.py b/airflow/operators/bash.py
index a551a82..0c96696 100644
--- a/airflow/operators/bash.py
+++ b/airflow/operators/bash.py
@@ -44,6 +44,11 @@ class BashOperator(BaseOperator):
of inheriting the current process environment, which is the default
behavior. (templated)
:type env: dict
+ :param append_env: If False(default) uses the environment variables passed
in env params
+ and does not inherit the current process environment. If True,
inherits the environment variables
+ from current passes and then environment variable passed by the user
will either update the existing
+ inherited environment variables or the new variables gets appended to
it
+ :type append_env: bool
:param output_encoding: Output encoding of bash command
:type output_encoding: str
:param skip_exit_code: If task exits with this exit code, leave the task
@@ -135,6 +140,7 @@ class BashOperator(BaseOperator):
*,
bash_command: str,
env: Optional[Dict[str, str]] = None,
+ append_env: bool = False,
output_encoding: str = 'utf-8',
skip_exit_code: int = 99,
cwd: str = None,
@@ -146,6 +152,7 @@ class BashOperator(BaseOperator):
self.output_encoding = output_encoding
self.skip_exit_code = skip_exit_code
self.cwd = cwd
+ self.append_env = append_env
if kwargs.get('xcom_push') is not None:
raise AirflowException("'xcom_push' was deprecated, use
'BaseOperator.do_xcom_push' instead")
@@ -156,9 +163,14 @@ class BashOperator(BaseOperator):
def get_env(self, context):
"""Builds the set of environment variables to be exposed for the bash
command"""
+ system_env = os.environ.copy()
env = self.env
if env is None:
- env = os.environ.copy()
+ env = system_env
+ else:
+ if self.append_env:
+ system_env.update(env)
+ env = system_env
airflow_context_vars = context_to_airflow_vars(context,
in_env_var_format=True)
self.log.debug(
diff --git a/tests/operators/test_bash.py b/tests/operators/test_bash.py
index 36e4744..412edfa 100644
--- a/tests/operators/test_bash.py
+++ b/tests/operators/test_bash.py
@@ -25,7 +25,6 @@ import pytest
from parameterized import parameterized
from airflow.exceptions import AirflowException, AirflowSkipException
-from airflow.models import DagRun
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.utils import timezone
@@ -38,11 +37,26 @@ INTERVAL = timedelta(hours=12)
class TestBashOperator(unittest.TestCase):
- def test_echo_env_variables(self):
+ @parameterized.expand(
+ [
+ (False, None, 'MY_PATH_TO_AIRFLOW_HOME'),
+ (True, {'AIRFLOW_HOME': 'OVERRIDDEN_AIRFLOW_HOME'},
'OVERRIDDEN_AIRFLOW_HOME'),
+ ]
+ )
+ def test_echo_env_variables(self, append_env, user_defined_env,
expected_airflow_home):
"""
Test that env variables are exported correctly to the task bash
environment.
"""
utc_now = datetime.utcnow().replace(tzinfo=timezone.utc)
+ expected = (
+ f"{expected_airflow_home}\n"
+ "AWESOME_PYTHONPATH\n"
+ "bash_op_test\n"
+ "echo_env_vars\n"
+ f"{utc_now.isoformat()}\n"
+ f"manual__{utc_now.isoformat()}\n"
+ )
+
dag = DAG(
dag_id='bash_op_test',
default_args={'owner': 'airflow', 'retries': 100, 'start_date':
DEFAULT_DATE},
@@ -68,6 +82,8 @@ class TestBashOperator(unittest.TestCase):
'echo $AIRFLOW_CTX_TASK_ID>> {0};'
'echo $AIRFLOW_CTX_EXECUTION_DATE>> {0};'
'echo $AIRFLOW_CTX_DAG_RUN_ID>> {0};'.format(tmp_file.name),
+ append_env=append_env,
+ env=user_defined_env,
)
with mock.patch.dict(
@@ -77,13 +93,7 @@ class TestBashOperator(unittest.TestCase):
with open(tmp_file.name) as file:
output = ''.join(file.readlines())
- assert 'MY_PATH_TO_AIRFLOW_HOME' in output
- # exported in run-tests as part of PYTHONPATH
- assert 'AWESOME_PYTHONPATH' in output
- assert 'bash_op_test' in output
- assert 'echo_env_vars' in output
- assert utc_now.isoformat() in output
- assert DagRun.generate_run_id(DagRunType.MANUAL, utc_now) in
output
+ assert expected == output
@parameterized.expand(
[
@@ -147,7 +157,6 @@ class TestBashOperator(unittest.TestCase):
BashOperator(task_id='abc', bash_command=test_cmd,
cwd=tmp_file.name).execute({})
def test_valid_cwd(self):
-
test_cmd = 'set -e; echo "xxxx" |tee outputs.txt'
with TemporaryDirectory(prefix='test_command_with_cwd') as
test_cwd_folder:
# Test everything went alright