This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
     new aca26754e5e [Backport] BashOperator: Execute templated bash script as 
file (#43191)
aca26754e5e is described below

commit aca26754e5eb8e3bca03835e84052c5410bc0256
Author: Joffrey Bienvenu <[email protected]>
AuthorDate: Thu Oct 24 02:21:24 2024 +0200

    [Backport] BashOperator: Execute templated bash script as file (#43191)
    
    
    
    ---------
    
    Co-authored-by: Joffrey Bienvenu <[email protected]>
---
 airflow/hooks/subprocess.py                 |  19 +++-
 airflow/operators/bash.py                   |  88 +++++++++++++++---
 docs/apache-airflow/howto/operator/bash.rst | 138 ++++++++++++++++------------
 newsfragments/43191.improvement.rst         |   1 +
 tests/decorators/test_bash.py               |  35 +++++++
 tests/operators/test_bash.py                |  28 ++++++
 6 files changed, 233 insertions(+), 76 deletions(-)

diff --git a/airflow/hooks/subprocess.py b/airflow/hooks/subprocess.py
index bc20b5c20b4..4ae3cde8bdb 100644
--- a/airflow/hooks/subprocess.py
+++ b/airflow/hooks/subprocess.py
@@ -22,12 +22,27 @@ import signal
 from collections import namedtuple
 from subprocess import PIPE, STDOUT, Popen
 from tempfile import TemporaryDirectory, gettempdir
+from typing import Iterator
 
 from airflow.hooks.base import BaseHook
 
 SubprocessResult = namedtuple("SubprocessResult", ["exit_code", "output"])
 
 
[email protected]
+def working_directory(cwd: str | None = None) -> Iterator[str]:
+    """
+    Context manager for handling (temporary) working directory.
+
+    Use the given cwd as working directory, if provided.
+    Otherwise, create a temporary directory.
+    """
+    with contextlib.ExitStack() as stack:
+        if cwd is None:
+            cwd = stack.enter_context(TemporaryDirectory(prefix="airflowtmp"))
+        yield cwd
+
+
 class SubprocessHook(BaseHook):
     """Hook for running processes with the ``subprocess`` module."""
 
@@ -61,9 +76,7 @@ class SubprocessHook(BaseHook):
             or stdout
         """
         self.log.info("Tmp dir root location: %s", gettempdir())
-        with contextlib.ExitStack() as stack:
-            if cwd is None:
-                cwd = 
stack.enter_context(TemporaryDirectory(prefix="airflowtmp"))
+        with working_directory(cwd=cwd) as cwd:
 
             def pre_exec():
                 # Restore default signal disposition and invoke setsid
diff --git a/airflow/operators/bash.py b/airflow/operators/bash.py
index 2ec0341a0d1..2b9f9958d15 100644
--- a/airflow/operators/bash.py
+++ b/airflow/operators/bash.py
@@ -19,12 +19,13 @@ from __future__ import annotations
 
 import os
 import shutil
+import tempfile
 import warnings
 from functools import cached_property
 from typing import TYPE_CHECKING, Any, Callable, Container, Sequence, cast
 
-from airflow.exceptions import AirflowException, AirflowSkipException
-from airflow.hooks.subprocess import SubprocessHook
+from airflow.exceptions import AirflowException, AirflowSkipException, 
RemovedInAirflow3Warning
+from airflow.hooks.subprocess import SubprocessHook, SubprocessResult, 
working_directory
 from airflow.models.baseoperator import BaseOperator
 from airflow.utils.operator_helpers import context_to_airflow_vars
 from airflow.utils.types import ArgNotSet
@@ -63,6 +64,9 @@ class BashOperator(BaseOperator):
         If None (default), the command is run in a temporary directory.
         To use current DAG folder as the working directory,
         you might set template ``{{ dag_run.dag.folder }}``.
+        When bash_command is a '.sh' or '.bash' file, Airflow must have write
+        access to the working directory. The script will be rendered (Jinja
+        template) into a new temporary file in this directory.
     :param output_processor: Function to further process the output of the 
bash script
         (default is lambda output: output).
 
@@ -97,10 +101,14 @@ class BashOperator(BaseOperator):
 
     .. note::
 
-        Add a space after the script name when directly calling a ``.sh`` 
script with the
-        ``bash_command`` argument -- for example ``bash_command="my_script.sh 
"``.  This
-        is because Airflow tries to apply load this file and process it as a 
Jinja template to
-        it ends with ``.sh``, which will likely not be what most users want.
+        To simply execute a ``.sh`` or ``.bash`` script (without any Jinja 
template), add a space after the
+        script name ``bash_command`` argument -- for example 
``bash_command="my_script.sh "``. This
+        is because Airflow tries to load this file and process it as a Jinja 
template when
+        it ends with ``.sh`` or ``.bash``.
+
+        If you have Jinja template in your script, do not put any blank space. 
And add the script's directory
+        in the DAG's ``template_searchpath``. If you specify a ``cwd``, 
Airflow must have write access to
+        this directory. The script will be rendered (Jinja template) into a 
new temporary file in this directory.
 
     .. warning::
 
@@ -180,6 +188,11 @@ class BashOperator(BaseOperator):
         # determine whether the bash_command value needs to re-rendered.
         self._init_bash_command_not_set = isinstance(self.bash_command, 
ArgNotSet)
 
+        # Keep a copy of the original bash_command, without the Jinja template 
rendered.
+        # This is later used to determine if the bash_command is a script or 
an inline string command.
+        # We do this later, because the bash_command is not available in 
__init__ when using @task.bash.
+        self._unrendered_bash_command: str | ArgNotSet = bash_command
+
     @cached_property
     def subprocess_hook(self):
         """Returns hook for running the bash command."""
@@ -200,7 +213,7 @@ class BashOperator(BaseOperator):
 
         
RenderedTaskInstanceFields._update_runtime_evaluated_template_fields(ti)
 
-    def get_env(self, context):
+    def get_env(self, context) -> dict:
         """Build the set of environment variables to be exposed for the bash 
command."""
         system_env = os.environ.copy()
         env = self.env
@@ -220,7 +233,7 @@ class BashOperator(BaseOperator):
         return env
 
     def execute(self, context: Context):
-        bash_path = shutil.which("bash") or "bash"
+        bash_path: str = shutil.which("bash") or "bash"
         if self.cwd is not None:
             if not os.path.exists(self.cwd):
                 raise AirflowException(f"Can not find the cwd: {self.cwd}")
@@ -234,15 +247,29 @@ class BashOperator(BaseOperator):
         # Both will ensure the correct Bash command is executed and that the 
Rendered Template view in the UI
         # displays the executed command (otherwise it will display as an 
ArgNotSet type).
         if self._init_bash_command_not_set:
+            is_inline_command = self._is_inline_command(bash_command=cast(str, 
self.bash_command))
             ti = cast("TaskInstance", context["ti"])
             self.refresh_bash_command(ti)
+        else:
+            is_inline_command = self._is_inline_command(bash_command=cast(str, 
self._unrendered_bash_command))
+
+        if is_inline_command:
+            result = self._run_inline_command(bash_path=bash_path, env=env)
+        else:
+            try:
+                result = self._run_rendered_script_file(bash_path=bash_path, 
env=env)
+            except PermissionError:
+                directory: str = self.cwd or tempfile.gettempdir()
+                warnings.warn(
+                    "BashOperator behavior for script files (`.sh` and 
`.bash`) containing Jinja templating "
+                    "will change in Airflow 3: script's content will be 
rendered into a new temporary file, "
+                    "and then executed (instead of being directly executed as 
inline command). "
+                    f"Ensure Airflow has write and execute permission in the 
`{directory}` directory.",
+                    RemovedInAirflow3Warning,
+                    stacklevel=2,
+                )
+                result = self._run_inline_command(bash_path=bash_path, env=env)
 
-        result = self.subprocess_hook.run_command(
-            command=[bash_path, "-c", self.bash_command],
-            env=env,
-            output_encoding=self.output_encoding,
-            cwd=self.cwd,
-        )
         if result.exit_code in self.skip_on_exit_code:
             raise AirflowSkipException(f"Bash command returned exit code 
{result.exit_code}. Skipping.")
         elif result.exit_code != 0:
@@ -252,5 +279,38 @@ class BashOperator(BaseOperator):
 
         return self.output_processor(result.output)
 
+    def _run_inline_command(self, bash_path: str, env: dict) -> 
SubprocessResult:
+        """Pass the bash command as string directly in the subprocess."""
+        return self.subprocess_hook.run_command(
+            command=[bash_path, "-c", self.bash_command],
+            env=env,
+            output_encoding=self.output_encoding,
+            cwd=self.cwd,
+        )
+
+    def _run_rendered_script_file(self, bash_path: str, env: dict) -> 
SubprocessResult:
+        """
+        Save the bash command into a file and execute this file.
+
+        This allows for longer commands, and prevents "Argument list too long 
error".
+        """
+        with working_directory(cwd=self.cwd) as cwd:
+            with tempfile.NamedTemporaryFile(mode="w", dir=cwd, suffix=".sh") 
as file:
+                file.write(cast(str, self.bash_command))
+                file.flush()
+
+                bash_script = os.path.basename(file.name)
+                return self.subprocess_hook.run_command(
+                    command=[bash_path, bash_script],
+                    env=env,
+                    output_encoding=self.output_encoding,
+                    cwd=cwd,
+                )
+
+    @classmethod
+    def _is_inline_command(cls, bash_command: str) -> bool:
+        """Return True if the bash command is an inline string. False if it's 
a bash script file."""
+        return not bash_command.endswith(tuple(cls.template_ext))
+
     def on_kill(self) -> None:
         self.subprocess_hook.send_sigterm()
diff --git a/docs/apache-airflow/howto/operator/bash.rst 
b/docs/apache-airflow/howto/operator/bash.rst
index daf430fa14c..b9f0934951a 100644
--- a/docs/apache-airflow/howto/operator/bash.rst
+++ b/docs/apache-airflow/howto/operator/bash.rst
@@ -231,70 +231,21 @@ Executing commands from files
 Both the ``BashOperator`` and ``@task.bash`` TaskFlow decorator enables you to 
execute Bash commands stored
 in files. The files **must** have a ``.sh`` or ``.bash`` extension.
 
-Note the space after the script name (more on this in the next section).
-
-.. tab-set::
-
-    .. tab-item:: @task.bash
-        :sync: taskflow
-
-        .. code-block:: python
-            :emphasize-lines: 3
-
-            @task.bash
-            def run_command_from_script() -> str:
-                return "$AIRFLOW_HOME/scripts/example.sh "
+With Jinja template
+"""""""""""""""""""
 
+You can execute bash script which contains Jinja templates. When you do so, 
Airflow
+loads the content of your file, render the templates, and write the rendered 
script
+into a temporary file. By default, the file is placed in a temporary directory
+(under ``/tmp``). You can change this location with the ``cwd`` parameter.
 
-            run_script = run_command_from_script()
-
-    .. tab-item:: BashOperator
-        :sync: operator
-
-        .. code-block:: python
-            :emphasize-lines: 3
-
-            run_script = BashOperator(
-                task_id="run_command_from_script",
-                bash_command="$AIRFLOW_HOME/scripts/example.sh ",
-            )
-
-
-Jinja template not found
-""""""""""""""""""""""""
-
-If you encounter a "Template not found" exception when trying to execute a 
Bash script, add a space after the
-script name. This is because Airflow tries to apply a Jinja template to it, 
which will fail.
-
-.. tab-set::
-
-    .. tab-item:: @task.bash
-        :sync: taskflow
-
-        .. code-block:: python
-
-            @task.bash
-            def bash_example():
-                # This fails with 'Jinja template not found' error
-                # return "/home/batcher/test.sh",
-                # This works (has a space after)
-                return "/home/batcher/test.sh "
-
-    .. tab-item:: BashOperator
-        :sync: operator
+.. caution::
 
-        .. code-block:: python
+    Airflow must have write access to ``/tmp`` or the ``cwd`` directory, to be
+    able to write the temporary file to the disk.
 
-            BashOperator(
-                task_id="bash_example",
-                # This fails with 'Jinja template not found' error
-                # bash_command="/home/batcher/test.sh",
-                # This works (has a space after)
-                bash_command="/home/batcher/test.sh ",
-            )
 
-However, if you want to use templating in your Bash script, do not add the 
space
-and instead put your Bash script in a location relative to the directory 
containing
+To execute a bash script, place it in a location relative to the directory 
containing
 the DAG file. So if your DAG file is in 
``/usr/local/airflow/dags/test_dag.py``, you can
 move your ``test.sh`` file to any location under ``/usr/local/airflow/dags/`` 
(Example:
 ``/usr/local/airflow/dags/scripts/test.sh``) and pass the relative path to 
``bash_command``
@@ -357,6 +308,75 @@ locations in the DAG constructor call.
                     bash_command="test.sh ",
                 )
 
+Without Jinja template
+""""""""""""""""""""""
+
+If your script doesn't contains any Jinja template, disable Airflow's 
rendering by
+adding a space after the script name.
+
+.. tab-set::
+
+    .. tab-item:: @task.bash
+        :sync: taskflow
+
+        .. code-block:: python
+            :emphasize-lines: 3
+
+            @task.bash
+            def run_command_from_script() -> str:
+                return "$AIRFLOW_HOME/scripts/example.sh "
+
+
+            run_script = run_command_from_script()
+
+    .. tab-item:: BashOperator
+        :sync: operator
+
+        .. code-block:: python
+            :emphasize-lines: 3
+
+            run_script = BashOperator(
+                task_id="run_command_from_script",
+                bash_command="$AIRFLOW_HOME/scripts/example.sh ",
+            )
+
+
+Jinja template not found
+""""""""""""""""""""""""
+
+If you encounter a "Template not found" exception when trying to execute a 
Bash script, add a space after the
+script name. This is because Airflow tries to apply a Jinja template to it, 
which will fail.
+
+.. tab-set::
+
+    .. tab-item:: @task.bash
+        :sync: taskflow
+
+        .. code-block:: python
+
+            @task.bash
+            def bash_example():
+                # This fails with 'Jinja template not found' error
+                # return "/home/batcher/test.sh",
+                # This works (has a space after)
+                return "/home/batcher/test.sh "
+
+    .. tab-item:: BashOperator
+        :sync: operator
+
+        .. code-block:: python
+
+            BashOperator(
+                task_id="bash_example",
+                # This fails with 'Jinja template not found' error
+                # bash_command="/home/batcher/test.sh",
+                # This works (has a space after)
+                bash_command="/home/batcher/test.sh ",
+            )
+
+However, if you want to use templating in your Bash script, do not add the 
space
+and instead check the `bash script with Jinja template 
<#with-jinja-template>`_ section.
+
 Enriching Bash with Python
 --------------------------
 
diff --git a/newsfragments/43191.improvement.rst 
b/newsfragments/43191.improvement.rst
new file mode 100644
index 00000000000..eb6a2181bd0
--- /dev/null
+++ b/newsfragments/43191.improvement.rst
@@ -0,0 +1 @@
+Bash script files (``.sh`` and ``.bash``) with Jinja templating enabled 
(without the space after the file extension) are now rendered into a temporary 
file, and then executed. Instead of being directly executed as inline command.
diff --git a/tests/decorators/test_bash.py b/tests/decorators/test_bash.py
index 9fa7999e834..1b7bab66447 100644
--- a/tests/decorators/test_bash.py
+++ b/tests/decorators/test_bash.py
@@ -20,6 +20,8 @@ import os
 import stat
 import warnings
 from contextlib import nullcontext as no_raise
+from pathlib import Path
+from typing import TYPE_CHECKING
 from unittest import mock
 
 import pytest
@@ -31,6 +33,10 @@ from airflow.utils import timezone
 from airflow.utils.types import NOTSET
 from tests.test_utils.db import clear_db_dags, clear_db_runs, 
clear_rendered_ti_fields
 
+if TYPE_CHECKING:
+    from airflow.models import TaskInstance
+    from airflow.operators.bash import BashOperator
+
 DEFAULT_DATE = timezone.datetime(2023, 1, 1)
 
 # TODO(potiuk) see why this test hangs in DB isolation mode
@@ -502,3 +508,32 @@ class TestBashDecorator:
         with pytest.raises(AirflowException):
             ti.run()
         assert ti.task.bash_command == f"{DEFAULT_DATE.date()}; exit 1;"
+
+    def test_templated_bash_script(self, dag_maker, tmp_path, session):
+        """
+        Creates a .sh script with Jinja template.
+        Pass it to the BashOperator and ensure it gets correctly rendered and 
executed.
+        """
+        bash_script: str = "sample.sh"
+        path: Path = tmp_path / bash_script
+        path.write_text('echo "{{ ti.task_id }}"')
+
+        with dag_maker(
+            dag_id="test_templated_bash_script", session=session, 
template_searchpath=os.fspath(path.parent)
+        ):
+
+            @task.bash
+            def test_templated_fields_task():
+                return bash_script
+
+            test_templated_fields_task()
+
+        ti: TaskInstance = dag_maker.create_dagrun().task_instances[0]
+        session.add(ti)
+        session.commit()
+        context = ti.get_template_context(session=session)
+        ti.render_templates(context=context)
+
+        op: BashOperator = ti.task
+        result = op.execute(context=context)
+        assert result == "test_templated_fields_task"
diff --git a/tests/operators/test_bash.py b/tests/operators/test_bash.py
index f63dd0dea22..ac629423e80 100644
--- a/tests/operators/test_bash.py
+++ b/tests/operators/test_bash.py
@@ -23,6 +23,7 @@ import signal
 from datetime import datetime, timedelta
 from pathlib import Path
 from time import sleep
+from typing import TYPE_CHECKING
 from unittest import mock
 
 import pytest
@@ -33,6 +34,9 @@ from airflow.utils import timezone
 from airflow.utils.state import State
 from airflow.utils.types import DagRunType
 
+if TYPE_CHECKING:
+    from airflow.models import TaskInstance
+
 DEFAULT_DATE = datetime(2016, 1, 1, tzinfo=timezone.utc)
 END_DATE = datetime(2016, 1, 2, tzinfo=timezone.utc)
 INTERVAL = timedelta(hours=12)
@@ -55,6 +59,7 @@ class TestBashOperator:
         assert op.skip_on_exit_code == [99]
         assert op.cwd is None
         assert op._init_bash_command_not_set is False
+        assert op._unrendered_bash_command == "echo"
 
     @pytest.mark.db_test
     @pytest.mark.parametrize(
@@ -277,3 +282,26 @@ class TestBashOperator:
         assert task.bash_command == 'echo "test_templated_fields_dag"'
         assert task.env == {"FOO": "2024-02-01"}
         assert task.cwd == Path(__file__).absolute().parent.as_posix()
+
+    def test_templated_bash_script(self, dag_maker, tmp_path, session):
+        """
+        Creates a .sh script with Jinja template.
+        Pass it to the BashOperator and ensure it gets correctly rendered and 
executed.
+        """
+        bash_script: str = "sample.sh"
+        path: Path = tmp_path / bash_script
+        path.write_text('echo "{{ ti.task_id }}"')
+
+        with dag_maker(
+            dag_id="test_templated_bash_script", session=session, 
template_searchpath=os.fspath(path.parent)
+        ):
+            BashOperator(task_id="test_templated_fields_task", 
bash_command=bash_script)
+        ti: TaskInstance = dag_maker.create_dagrun().task_instances[0]
+        session.add(ti)
+        session.commit()
+        context = ti.get_template_context(session=session)
+        ti.render_templates(context=context)
+
+        task: BashOperator = ti.task
+        result = task.execute(context=context)
+        assert result == "test_templated_fields_task"

Reply via email to