This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new aca26754e5e [Backport] BashOperator: Execute templated bash script as
file (#43191)
aca26754e5e is described below
commit aca26754e5eb8e3bca03835e84052c5410bc0256
Author: Joffrey Bienvenu <[email protected]>
AuthorDate: Thu Oct 24 02:21:24 2024 +0200
[Backport] BashOperator: Execute templated bash script as file (#43191)
---------
Co-authored-by: Joffrey Bienvenu <[email protected]>
---
airflow/hooks/subprocess.py | 19 +++-
airflow/operators/bash.py | 88 +++++++++++++++---
docs/apache-airflow/howto/operator/bash.rst | 138 ++++++++++++++++------------
newsfragments/43191.improvement.rst | 1 +
tests/decorators/test_bash.py | 35 +++++++
tests/operators/test_bash.py | 28 ++++++
6 files changed, 233 insertions(+), 76 deletions(-)
diff --git a/airflow/hooks/subprocess.py b/airflow/hooks/subprocess.py
index bc20b5c20b4..4ae3cde8bdb 100644
--- a/airflow/hooks/subprocess.py
+++ b/airflow/hooks/subprocess.py
@@ -22,12 +22,27 @@ import signal
from collections import namedtuple
from subprocess import PIPE, STDOUT, Popen
from tempfile import TemporaryDirectory, gettempdir
+from typing import Iterator
from airflow.hooks.base import BaseHook
SubprocessResult = namedtuple("SubprocessResult", ["exit_code", "output"])
[email protected]
+def working_directory(cwd: str | None = None) -> Iterator[str]:
+ """
+ Context manager for handling (temporary) working directory.
+
+ Use the given cwd as working directory, if provided.
+ Otherwise, create a temporary directory.
+ """
+ with contextlib.ExitStack() as stack:
+ if cwd is None:
+ cwd = stack.enter_context(TemporaryDirectory(prefix="airflowtmp"))
+ yield cwd
+
+
class SubprocessHook(BaseHook):
"""Hook for running processes with the ``subprocess`` module."""
@@ -61,9 +76,7 @@ class SubprocessHook(BaseHook):
or stdout
"""
self.log.info("Tmp dir root location: %s", gettempdir())
- with contextlib.ExitStack() as stack:
- if cwd is None:
- cwd =
stack.enter_context(TemporaryDirectory(prefix="airflowtmp"))
+ with working_directory(cwd=cwd) as cwd:
def pre_exec():
# Restore default signal disposition and invoke setsid
diff --git a/airflow/operators/bash.py b/airflow/operators/bash.py
index 2ec0341a0d1..2b9f9958d15 100644
--- a/airflow/operators/bash.py
+++ b/airflow/operators/bash.py
@@ -19,12 +19,13 @@ from __future__ import annotations
import os
import shutil
+import tempfile
import warnings
from functools import cached_property
from typing import TYPE_CHECKING, Any, Callable, Container, Sequence, cast
-from airflow.exceptions import AirflowException, AirflowSkipException
-from airflow.hooks.subprocess import SubprocessHook
+from airflow.exceptions import AirflowException, AirflowSkipException,
RemovedInAirflow3Warning
+from airflow.hooks.subprocess import SubprocessHook, SubprocessResult,
working_directory
from airflow.models.baseoperator import BaseOperator
from airflow.utils.operator_helpers import context_to_airflow_vars
from airflow.utils.types import ArgNotSet
@@ -63,6 +64,9 @@ class BashOperator(BaseOperator):
If None (default), the command is run in a temporary directory.
To use current DAG folder as the working directory,
you might set template ``{{ dag_run.dag.folder }}``.
+ When bash_command is a '.sh' or '.bash' file, Airflow must have write
+ access to the working directory. The script will be rendered (Jinja
+ template) into a new temporary file in this directory.
:param output_processor: Function to further process the output of the
bash script
(default is lambda output: output).
@@ -97,10 +101,14 @@ class BashOperator(BaseOperator):
.. note::
- Add a space after the script name when directly calling a ``.sh``
script with the
- ``bash_command`` argument -- for example ``bash_command="my_script.sh
"``. This
- is because Airflow tries to apply load this file and process it as a
Jinja template to
- it ends with ``.sh``, which will likely not be what most users want.
+ To simply execute a ``.sh`` or ``.bash`` script (without any Jinja
template), add a space after the
+ script name ``bash_command`` argument -- for example
``bash_command="my_script.sh "``. This
+ is because Airflow tries to load this file and process it as a Jinja
template when
+ it ends with ``.sh`` or ``.bash``.
+
+ If you have Jinja template in your script, do not put any blank space.
And add the script's directory
+ in the DAG's ``template_searchpath``. If you specify a ``cwd``,
Airflow must have write access to
+ this directory. The script will be rendered (Jinja template) into a
new temporary file in this directory.
.. warning::
@@ -180,6 +188,11 @@ class BashOperator(BaseOperator):
# determine whether the bash_command value needs to re-rendered.
self._init_bash_command_not_set = isinstance(self.bash_command,
ArgNotSet)
+ # Keep a copy of the original bash_command, without the Jinja template
rendered.
+ # This is later used to determine if the bash_command is a script or
an inline string command.
+ # We do this later, because the bash_command is not available in
__init__ when using @task.bash.
+ self._unrendered_bash_command: str | ArgNotSet = bash_command
+
@cached_property
def subprocess_hook(self):
"""Returns hook for running the bash command."""
@@ -200,7 +213,7 @@ class BashOperator(BaseOperator):
RenderedTaskInstanceFields._update_runtime_evaluated_template_fields(ti)
- def get_env(self, context):
+ def get_env(self, context) -> dict:
"""Build the set of environment variables to be exposed for the bash
command."""
system_env = os.environ.copy()
env = self.env
@@ -220,7 +233,7 @@ class BashOperator(BaseOperator):
return env
def execute(self, context: Context):
- bash_path = shutil.which("bash") or "bash"
+ bash_path: str = shutil.which("bash") or "bash"
if self.cwd is not None:
if not os.path.exists(self.cwd):
raise AirflowException(f"Can not find the cwd: {self.cwd}")
@@ -234,15 +247,29 @@ class BashOperator(BaseOperator):
# Both will ensure the correct Bash command is executed and that the
Rendered Template view in the UI
# displays the executed command (otherwise it will display as an
ArgNotSet type).
if self._init_bash_command_not_set:
+ is_inline_command = self._is_inline_command(bash_command=cast(str,
self.bash_command))
ti = cast("TaskInstance", context["ti"])
self.refresh_bash_command(ti)
+ else:
+ is_inline_command = self._is_inline_command(bash_command=cast(str,
self._unrendered_bash_command))
+
+ if is_inline_command:
+ result = self._run_inline_command(bash_path=bash_path, env=env)
+ else:
+ try:
+ result = self._run_rendered_script_file(bash_path=bash_path,
env=env)
+ except PermissionError:
+ directory: str = self.cwd or tempfile.gettempdir()
+ warnings.warn(
+ "BashOperator behavior for script files (`.sh` and
`.bash`) containing Jinja templating "
+ "will change in Airflow 3: script's content will be
rendered into a new temporary file, "
+ "and then executed (instead of being directly executed as
inline command). "
+ f"Ensure Airflow has write and execute permission in the
`{directory}` directory.",
+ RemovedInAirflow3Warning,
+ stacklevel=2,
+ )
+ result = self._run_inline_command(bash_path=bash_path, env=env)
- result = self.subprocess_hook.run_command(
- command=[bash_path, "-c", self.bash_command],
- env=env,
- output_encoding=self.output_encoding,
- cwd=self.cwd,
- )
if result.exit_code in self.skip_on_exit_code:
raise AirflowSkipException(f"Bash command returned exit code
{result.exit_code}. Skipping.")
elif result.exit_code != 0:
@@ -252,5 +279,38 @@ class BashOperator(BaseOperator):
return self.output_processor(result.output)
+ def _run_inline_command(self, bash_path: str, env: dict) ->
SubprocessResult:
+ """Pass the bash command as string directly in the subprocess."""
+ return self.subprocess_hook.run_command(
+ command=[bash_path, "-c", self.bash_command],
+ env=env,
+ output_encoding=self.output_encoding,
+ cwd=self.cwd,
+ )
+
+ def _run_rendered_script_file(self, bash_path: str, env: dict) ->
SubprocessResult:
+ """
+ Save the bash command into a file and execute this file.
+
+ This allows for longer commands, and prevents "Argument list too long
error".
+ """
+ with working_directory(cwd=self.cwd) as cwd:
+ with tempfile.NamedTemporaryFile(mode="w", dir=cwd, suffix=".sh")
as file:
+ file.write(cast(str, self.bash_command))
+ file.flush()
+
+ bash_script = os.path.basename(file.name)
+ return self.subprocess_hook.run_command(
+ command=[bash_path, bash_script],
+ env=env,
+ output_encoding=self.output_encoding,
+ cwd=cwd,
+ )
+
+ @classmethod
+ def _is_inline_command(cls, bash_command: str) -> bool:
+ """Return True if the bash command is an inline string. False if it's
a bash script file."""
+ return not bash_command.endswith(tuple(cls.template_ext))
+
def on_kill(self) -> None:
self.subprocess_hook.send_sigterm()
diff --git a/docs/apache-airflow/howto/operator/bash.rst
b/docs/apache-airflow/howto/operator/bash.rst
index daf430fa14c..b9f0934951a 100644
--- a/docs/apache-airflow/howto/operator/bash.rst
+++ b/docs/apache-airflow/howto/operator/bash.rst
@@ -231,70 +231,21 @@ Executing commands from files
Both the ``BashOperator`` and ``@task.bash`` TaskFlow decorator enables you to
execute Bash commands stored
in files. The files **must** have a ``.sh`` or ``.bash`` extension.
-Note the space after the script name (more on this in the next section).
-
-.. tab-set::
-
- .. tab-item:: @task.bash
- :sync: taskflow
-
- .. code-block:: python
- :emphasize-lines: 3
-
- @task.bash
- def run_command_from_script() -> str:
- return "$AIRFLOW_HOME/scripts/example.sh "
+With Jinja template
+"""""""""""""""""""
+You can execute bash script which contains Jinja templates. When you do so,
Airflow
+loads the content of your file, render the templates, and write the rendered
script
+into a temporary file. By default, the file is placed in a temporary directory
+(under ``/tmp``). You can change this location with the ``cwd`` parameter.
- run_script = run_command_from_script()
-
- .. tab-item:: BashOperator
- :sync: operator
-
- .. code-block:: python
- :emphasize-lines: 3
-
- run_script = BashOperator(
- task_id="run_command_from_script",
- bash_command="$AIRFLOW_HOME/scripts/example.sh ",
- )
-
-
-Jinja template not found
-""""""""""""""""""""""""
-
-If you encounter a "Template not found" exception when trying to execute a
Bash script, add a space after the
-script name. This is because Airflow tries to apply a Jinja template to it,
which will fail.
-
-.. tab-set::
-
- .. tab-item:: @task.bash
- :sync: taskflow
-
- .. code-block:: python
-
- @task.bash
- def bash_example():
- # This fails with 'Jinja template not found' error
- # return "/home/batcher/test.sh",
- # This works (has a space after)
- return "/home/batcher/test.sh "
-
- .. tab-item:: BashOperator
- :sync: operator
+.. caution::
- .. code-block:: python
+ Airflow must have write access to ``/tmp`` or the ``cwd`` directory, to be
+ able to write the temporary file to the disk.
- BashOperator(
- task_id="bash_example",
- # This fails with 'Jinja template not found' error
- # bash_command="/home/batcher/test.sh",
- # This works (has a space after)
- bash_command="/home/batcher/test.sh ",
- )
-However, if you want to use templating in your Bash script, do not add the
space
-and instead put your Bash script in a location relative to the directory
containing
+To execute a bash script, place it in a location relative to the directory
containing
the DAG file. So if your DAG file is in
``/usr/local/airflow/dags/test_dag.py``, you can
move your ``test.sh`` file to any location under ``/usr/local/airflow/dags/``
(Example:
``/usr/local/airflow/dags/scripts/test.sh``) and pass the relative path to
``bash_command``
@@ -357,6 +308,75 @@ locations in the DAG constructor call.
bash_command="test.sh ",
)
+Without Jinja template
+""""""""""""""""""""""
+
+If your script doesn't contains any Jinja template, disable Airflow's
rendering by
+adding a space after the script name.
+
+.. tab-set::
+
+ .. tab-item:: @task.bash
+ :sync: taskflow
+
+ .. code-block:: python
+ :emphasize-lines: 3
+
+ @task.bash
+ def run_command_from_script() -> str:
+ return "$AIRFLOW_HOME/scripts/example.sh "
+
+
+ run_script = run_command_from_script()
+
+ .. tab-item:: BashOperator
+ :sync: operator
+
+ .. code-block:: python
+ :emphasize-lines: 3
+
+ run_script = BashOperator(
+ task_id="run_command_from_script",
+ bash_command="$AIRFLOW_HOME/scripts/example.sh ",
+ )
+
+
+Jinja template not found
+""""""""""""""""""""""""
+
+If you encounter a "Template not found" exception when trying to execute a
Bash script, add a space after the
+script name. This is because Airflow tries to apply a Jinja template to it,
which will fail.
+
+.. tab-set::
+
+ .. tab-item:: @task.bash
+ :sync: taskflow
+
+ .. code-block:: python
+
+ @task.bash
+ def bash_example():
+ # This fails with 'Jinja template not found' error
+ # return "/home/batcher/test.sh",
+ # This works (has a space after)
+ return "/home/batcher/test.sh "
+
+ .. tab-item:: BashOperator
+ :sync: operator
+
+ .. code-block:: python
+
+ BashOperator(
+ task_id="bash_example",
+ # This fails with 'Jinja template not found' error
+ # bash_command="/home/batcher/test.sh",
+ # This works (has a space after)
+ bash_command="/home/batcher/test.sh ",
+ )
+
+However, if you want to use templating in your Bash script, do not add the
space
+and instead check the `bash script with Jinja template
<#with-jinja-template>`_ section.
+
Enriching Bash with Python
--------------------------
diff --git a/newsfragments/43191.improvement.rst
b/newsfragments/43191.improvement.rst
new file mode 100644
index 00000000000..eb6a2181bd0
--- /dev/null
+++ b/newsfragments/43191.improvement.rst
@@ -0,0 +1 @@
+Bash script files (``.sh`` and ``.bash``) with Jinja templating enabled
(without the space after the file extension) are now rendered into a temporary
file, and then executed. Instead of being directly executed as inline command.
diff --git a/tests/decorators/test_bash.py b/tests/decorators/test_bash.py
index 9fa7999e834..1b7bab66447 100644
--- a/tests/decorators/test_bash.py
+++ b/tests/decorators/test_bash.py
@@ -20,6 +20,8 @@ import os
import stat
import warnings
from contextlib import nullcontext as no_raise
+from pathlib import Path
+from typing import TYPE_CHECKING
from unittest import mock
import pytest
@@ -31,6 +33,10 @@ from airflow.utils import timezone
from airflow.utils.types import NOTSET
from tests.test_utils.db import clear_db_dags, clear_db_runs,
clear_rendered_ti_fields
+if TYPE_CHECKING:
+ from airflow.models import TaskInstance
+ from airflow.operators.bash import BashOperator
+
DEFAULT_DATE = timezone.datetime(2023, 1, 1)
# TODO(potiuk) see why this test hangs in DB isolation mode
@@ -502,3 +508,32 @@ class TestBashDecorator:
with pytest.raises(AirflowException):
ti.run()
assert ti.task.bash_command == f"{DEFAULT_DATE.date()}; exit 1;"
+
+ def test_templated_bash_script(self, dag_maker, tmp_path, session):
+ """
+ Creates a .sh script with Jinja template.
+ Pass it to the BashOperator and ensure it gets correctly rendered and
executed.
+ """
+ bash_script: str = "sample.sh"
+ path: Path = tmp_path / bash_script
+ path.write_text('echo "{{ ti.task_id }}"')
+
+ with dag_maker(
+ dag_id="test_templated_bash_script", session=session,
template_searchpath=os.fspath(path.parent)
+ ):
+
+ @task.bash
+ def test_templated_fields_task():
+ return bash_script
+
+ test_templated_fields_task()
+
+ ti: TaskInstance = dag_maker.create_dagrun().task_instances[0]
+ session.add(ti)
+ session.commit()
+ context = ti.get_template_context(session=session)
+ ti.render_templates(context=context)
+
+ op: BashOperator = ti.task
+ result = op.execute(context=context)
+ assert result == "test_templated_fields_task"
diff --git a/tests/operators/test_bash.py b/tests/operators/test_bash.py
index f63dd0dea22..ac629423e80 100644
--- a/tests/operators/test_bash.py
+++ b/tests/operators/test_bash.py
@@ -23,6 +23,7 @@ import signal
from datetime import datetime, timedelta
from pathlib import Path
from time import sleep
+from typing import TYPE_CHECKING
from unittest import mock
import pytest
@@ -33,6 +34,9 @@ from airflow.utils import timezone
from airflow.utils.state import State
from airflow.utils.types import DagRunType
+if TYPE_CHECKING:
+ from airflow.models import TaskInstance
+
DEFAULT_DATE = datetime(2016, 1, 1, tzinfo=timezone.utc)
END_DATE = datetime(2016, 1, 2, tzinfo=timezone.utc)
INTERVAL = timedelta(hours=12)
@@ -55,6 +59,7 @@ class TestBashOperator:
assert op.skip_on_exit_code == [99]
assert op.cwd is None
assert op._init_bash_command_not_set is False
+ assert op._unrendered_bash_command == "echo"
@pytest.mark.db_test
@pytest.mark.parametrize(
@@ -277,3 +282,26 @@ class TestBashOperator:
assert task.bash_command == 'echo "test_templated_fields_dag"'
assert task.env == {"FOO": "2024-02-01"}
assert task.cwd == Path(__file__).absolute().parent.as_posix()
+
+ def test_templated_bash_script(self, dag_maker, tmp_path, session):
+ """
+ Creates a .sh script with Jinja template.
+ Pass it to the BashOperator and ensure it gets correctly rendered and
executed.
+ """
+ bash_script: str = "sample.sh"
+ path: Path = tmp_path / bash_script
+ path.write_text('echo "{{ ti.task_id }}"')
+
+ with dag_maker(
+ dag_id="test_templated_bash_script", session=session,
template_searchpath=os.fspath(path.parent)
+ ):
+ BashOperator(task_id="test_templated_fields_task",
bash_command=bash_script)
+ ti: TaskInstance = dag_maker.create_dagrun().task_instances[0]
+ session.add(ti)
+ session.commit()
+ context = ti.get_template_context(session=session)
+ ti.render_templates(context=context)
+
+ task: BashOperator = ti.task
+ result = task.execute(context=context)
+ assert result == "test_templated_fields_task"