This is an automated email from the ASF dual-hosted git repository.

joshfell pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e920344044 Introduce @task.bash TaskFlow decorator (#30176)
e920344044 is described below

commit e9203440445b754ae9915b716bb2cdf59601cd79
Author: Josh Fell <[email protected]>
AuthorDate: Tue Dec 19 16:51:54 2023 -0500

    Introduce @task.bash TaskFlow decorator (#30176)
    
    * Introduce @task.bash TaskFlow decorator
    
    Adding a @task.bash TaskFlow decorator to the collection of existing core 
TaskFlow decorators. This particular decorator will use the return value of the 
decorated callable as the Bash command to execute using the existing 
BashOperator functionality.
---
 airflow/decorators/__init__.py                 |   3 +
 airflow/decorators/__init__.pyi                |  32 ++
 airflow/decorators/bash.py                     | 100 ++++++
 airflow/example_dags/example_bash_decorator.py | 115 +++++++
 airflow/operators/bash.py                      |  50 ++-
 docs/apache-airflow/howto/operator/bash.rst    | 342 +++++++++++++++----
 docs/apache-airflow/tutorial/taskflow.rst      |   2 +
 tests/decorators/test_bash.py                  | 453 +++++++++++++++++++++++++
 tests/operators/test_bash.py                   |  53 ++-
 9 files changed, 1051 insertions(+), 99 deletions(-)

diff --git a/airflow/decorators/__init__.py b/airflow/decorators/__init__.py
index 31bcfb263c..c43b98ee73 100644
--- a/airflow/decorators/__init__.py
+++ b/airflow/decorators/__init__.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 from typing import Any, Callable
 
 from airflow.decorators.base import TaskDecorator
+from airflow.decorators.bash import bash_task
 from airflow.decorators.branch_external_python import 
branch_external_python_task
 from airflow.decorators.branch_python import branch_task
 from airflow.decorators.branch_virtualenv import branch_virtualenv_task
@@ -47,6 +48,7 @@ __all__ = [
     "branch_external_python_task",
     "short_circuit_task",
     "sensor_task",
+    "bash_task",
     "setup",
     "teardown",
 ]
@@ -63,6 +65,7 @@ class TaskDecoratorCollection:
     branch_external_python = staticmethod(branch_external_python_task)
     short_circuit = staticmethod(short_circuit_task)
     sensor = staticmethod(sensor_task)
+    bash = staticmethod(bash_task)
 
     __call__: Any = python  # Alias '@task' to '@task.python'.
 
diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi
index 48cdf61946..238393c034 100644
--- a/airflow/decorators/__init__.pyi
+++ b/airflow/decorators/__init__.pyi
@@ -26,6 +26,7 @@ from typing import Any, Callable, Collection, Container, 
Iterable, Mapping, over
 from kubernetes.client import models as k8s
 
 from airflow.decorators.base import FParams, FReturn, Task, TaskDecorator
+from airflow.decorators.bash import bash_task
 from airflow.decorators.branch_external_python import 
branch_external_python_task
 from airflow.decorators.branch_python import branch_task
 from airflow.decorators.branch_virtualenv import branch_virtualenv_task
@@ -54,6 +55,7 @@ __all__ = [
     "branch_external_python_task",
     "short_circuit_task",
     "sensor_task",
+    "bash_task",
     "setup",
     "teardown",
 ]
@@ -638,6 +640,36 @@ class TaskDecoratorCollection:
     def pyspark(
         self, python_callable: Callable[FParams, FReturn] | None = None
     ) -> Task[FParams, FReturn]: ...
+    @overload
+    def bash(
+        self,
+        *,
+        env: dict[str, str] | None = None,
+        append_env: bool = False,
+        output_encoding: str = "utf-8",
+        skip_on_exit_code: int = 99,
+        cwd: str | None = None,
+        **kwargs,
+    ) -> TaskDecorator:
+        """Decorator to wrap a callable into a BashOperator task.
+
+        :param bash_command: The command, set of commands or reference to a 
bash script
+            (must be '.sh' or '.bash') to be executed. (templated)
+        :param env: If env is not None, it must be a dict that defines the 
environment variables for the new
+            process; these are used instead of inheriting the current process 
environment, which is the
+            default behavior. (templated)
+        :param append_env: If False(default) uses the environment variables 
passed in env params and does not
+            inherit the current process environment. If True, inherits the 
environment variables from current
+            passes and then environment variable passed by the user will 
either update the existing inherited
+            environment variables or the new variables gets appended to it
+        :param output_encoding: Output encoding of bash command
+        :param skip_on_exit_code: If task exits with this exit code, leave the 
task in ``skipped`` state
+            (default: 99). If set to ``None``, any non-zero exit code will be 
treated as a failure.
+        :param cwd: Working directory to execute the command in. If None 
(default), the command is run in a
+            temporary directory.
+        """
+    @overload
+    def bash(self, python_callable: Callable[FParams, FReturn]) -> 
Task[FParams, FReturn]: ...
 
 task: TaskDecoratorCollection
 setup: Callable
diff --git a/airflow/decorators/bash.py b/airflow/decorators/bash.py
new file mode 100644
index 0000000000..70011c3079
--- /dev/null
+++ b/airflow/decorators/bash.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import warnings
+from typing import Any, Callable, Collection, Mapping, Sequence
+
+from airflow.decorators.base import DecoratedOperator, TaskDecorator, 
task_decorator_factory
+from airflow.operators.bash import BashOperator
+from airflow.utils.context import Context, context_merge
+from airflow.utils.operator_helpers import determine_kwargs
+from airflow.utils.types import NOTSET
+
+
+class _BashDecoratedOperator(DecoratedOperator, BashOperator):
+    """Wraps a Python callable and uses the callable return value as the Bash 
command to be executed.
+
+    :param python_callable: A reference to an object that is callable.
+    :param op_kwargs: A dictionary of keyword arguments that will get unpacked
+        in your function (templated).
+    :param op_args: A list of positional arguments that will get unpacked when
+        calling your callable (templated).
+    """
+
+    template_fields: Sequence[str] = (*DecoratedOperator.template_fields, 
*BashOperator.template_fields)
+    template_fields_renderers: dict[str, str] = {
+        **DecoratedOperator.template_fields_renderers,
+        **BashOperator.template_fields_renderers,
+    }
+
+    custom_operator_name: str = "@task.bash"
+
+    def __init__(
+        self,
+        *,
+        python_callable: Callable,
+        op_args: Collection[Any] | None = None,
+        op_kwargs: Mapping[str, Any] | None = None,
+        **kwargs,
+    ) -> None:
+        if kwargs.get("multiple_outputs") is not None:
+            warnings.warn(
+                f"`multiple_outputs` is not supported in 
{self.custom_operator_name} tasks. Ignoring.",
+                stacklevel=1,
+            )
+        kwargs.pop("multiple_outputs")
+
+        super().__init__(
+            python_callable=python_callable,
+            op_args=op_args,
+            op_kwargs=op_kwargs,
+            bash_command=NOTSET,
+            **kwargs,
+        )
+
+    def execute(self, context: Context) -> Any:
+        context_merge(context, self.op_kwargs)
+        kwargs = determine_kwargs(self.python_callable, self.op_args, context)
+
+        self.bash_command = self.python_callable(*self.op_args, **kwargs)
+
+        if not isinstance(self.bash_command, str) or self.bash_command.strip() 
== "":
+            raise TypeError("The returned value from the TaskFlow callable 
must be a non-empty string.")
+
+        return super().execute(context)
+
+
+def bash_task(
+    python_callable: Callable | None = None,
+    **kwargs,
+) -> TaskDecorator:
+    """Wraps a function into a BashOperator.
+
+    Accepts kwargs for operator kwargs. Can be reused in a single DAG. This 
function is only used only used
+    during type checking or auto-completion.
+
+    :param python_callable: Function to decorate.
+
+    :meta private:
+    """
+    return task_decorator_factory(
+        python_callable=python_callable,
+        decorated_operator_class=_BashDecoratedOperator,
+        **kwargs,
+    )
diff --git a/airflow/example_dags/example_bash_decorator.py 
b/airflow/example_dags/example_bash_decorator.py
new file mode 100644
index 0000000000..ee8151f337
--- /dev/null
+++ b/airflow/example_dags/example_bash_decorator.py
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import pendulum
+
+from airflow.decorators import dag, task
+from airflow.exceptions import AirflowSkipException
+from airflow.models.baseoperator import chain
+from airflow.operators.empty import EmptyOperator
+from airflow.utils.trigger_rule import TriggerRule
+from airflow.utils.weekday import WeekDay
+
+
+@dag(schedule=None, start_date=pendulum.datetime(2023, 1, 1, tz="UTC"), 
catchup=False)
+def example_bash_decorator():
+    @task.bash
+    def run_me(sleep_seconds: int, task_instance_key_str: str) -> str:
+        return f"echo {task_instance_key_str} && sleep {sleep_seconds}"
+
+    run_me_loop = [run_me.override(task_id=f"runme_{i}")(sleep_seconds=i) for 
i in range(3)]
+
+    # [START howto_decorator_bash]
+    @task.bash
+    def run_after_loop() -> str:
+        return "echo 1"
+
+    run_this = run_after_loop()
+    # [END howto_decorator_bash]
+
+    # [START howto_decorator_bash_template]
+    @task.bash
+    def also_run_this() -> str:
+        return 'echo "ti_key={{ task_instance_key_str }}"'
+
+    also_this = also_run_this()
+    # [END howto_decorator_bash_template]
+
+    # [START howto_decorator_bash_context_vars]
+    @task.bash
+    def also_run_this_again(task_instance_key_str) -> str:
+        return f'echo "ti_key={task_instance_key_str}"'
+
+    also_this_again = also_run_this_again()
+    # [END howto_decorator_bash_context_vars]
+
+    # [START howto_decorator_bash_skip]
+    @task.bash
+    def this_will_skip() -> str:
+        return 'echo "hello world"; exit 99;'
+
+    this_skips = this_will_skip()
+    # [END howto_decorator_bash_skip]
+
+    run_this_last = EmptyOperator(task_id="run_this_last", 
trigger_rule=TriggerRule.ALL_DONE)
+
+    # [START howto_decorator_bash_conditional]
+    @task.bash
+    def sleep_in(day: str) -> str:
+        if day in (WeekDay.SATURDAY, WeekDay.SUNDAY):
+            return f"sleep {60 * 60}"
+        else:
+            raise AirflowSkipException("No sleeping in today!")
+
+    sleep_in(day="{{ dag_run.logical_date.strftime('%A').lower() }}")
+    # [END howto_decorator_bash_conditional]
+
+    # [START howto_decorator_bash_parametrize]
+    @task.bash(env={"BASE_DIR": "{{ dag_run.logical_date.strftime('%Y/%m/%d') 
}}"}, append_env=True)
+    def make_dynamic_dirs(new_dirs: str) -> str:
+        return f"mkdir -p $AIRFLOW_HOME/$BASE_DIR/{new_dirs}"
+
+    make_dynamic_dirs(new_dirs="foo/bar/baz")
+    # [END howto_decorator_bash_parametrize]
+
+    # [START howto_decorator_bash_build_cmd]
+    def _get_files_in_cwd() -> list[str]:
+        from pathlib import Path
+
+        dir_contents = Path.cwd().glob("airflow/example_dags/*.py")
+        files = [str(elem) for elem in dir_contents if elem.is_file()]
+
+        return files
+
+    @task.bash
+    def get_file_stats() -> str:
+        files = _get_files_in_cwd()
+        cmd = "stat "
+        cmd += " ".join(files)
+
+        return cmd
+
+    get_file_stats()
+    # [END howto_decorator_bash_build_cmd]
+
+    chain(run_me_loop, run_this)
+    chain([also_this, also_this_again, this_skips, run_this], run_this_last)
+
+
+example_bash_decorator()
diff --git a/airflow/operators/bash.py b/airflow/operators/bash.py
index 08fdaff1cb..d913b2b282 100644
--- a/airflow/operators/bash.py
+++ b/airflow/operators/bash.py
@@ -21,14 +21,16 @@ import os
 import shutil
 import warnings
 from functools import cached_property
-from typing import TYPE_CHECKING, Container, Sequence
+from typing import TYPE_CHECKING, Container, Sequence, cast
 
 from airflow.exceptions import AirflowException, AirflowSkipException
 from airflow.hooks.subprocess import SubprocessHook
 from airflow.models.baseoperator import BaseOperator
 from airflow.utils.operator_helpers import context_to_airflow_vars
+from airflow.utils.types import ArgNotSet
 
 if TYPE_CHECKING:
+    from airflow.models.taskinstance import TaskInstance
     from airflow.utils.context import Context
 
 
@@ -44,7 +46,7 @@ class BashOperator(BaseOperator):
     will also be pushed to an XCom when the bash command completes
 
     :param bash_command: The command, set of commands or reference to a
-        bash script (must be '.sh') to be executed. (templated)
+        Bash script (must be '.sh' or '.bash') to be executed. (templated)
     :param env: If env is not None, it must be a dict that defines the
         environment variables for the new process; these are used instead
         of inheriting the current process environment, which is the default
@@ -53,14 +55,14 @@ class BashOperator(BaseOperator):
         and does not inherit the current process environment. If True, 
inherits the environment variables
         from current passes and then environment variable passed by the user 
will either update the existing
         inherited environment variables or the new variables gets appended to 
it
-    :param output_encoding: Output encoding of bash command
+    :param output_encoding: Output encoding of Bash command
     :param skip_on_exit_code: If task exits with this exit code, leave the task
         in ``skipped`` state (default: 99). If set to ``None``, any non-zero
         exit code will be treated as a failure.
     :param cwd: Working directory to execute the command in.
         If None (default), the command is run in a temporary directory.
 
-    Airflow will evaluate the exit code of the bash command. In general, a 
non-zero exit code will result in
+    Airflow will evaluate the exit code of the Bash command. In general, a 
non-zero exit code will result in
     task failure and zero will result in task success.
     Exit code ``99`` (or another set in ``skip_on_exit_code``)
     will throw an :class:`airflow.exceptions.AirflowSkipException`, which will 
leave the task in ``skipped``
@@ -85,7 +87,6 @@ class BashOperator(BaseOperator):
         code.  This can be an issue if the non-zero exit arises from a 
sub-command.  The easiest way of
         addressing this is to prefix the command with ``set -e;``
 
-        Example:
         .. code-block:: python
 
             bash_command = "set -e; python3 script.py '{{ next_execution_date 
}}'"
@@ -131,16 +132,13 @@ class BashOperator(BaseOperator):
 
     template_fields: Sequence[str] = ("bash_command", "env")
     template_fields_renderers = {"bash_command": "bash", "env": "json"}
-    template_ext: Sequence[str] = (
-        ".sh",
-        ".bash",
-    )
+    template_ext: Sequence[str] = (".sh", ".bash")
     ui_color = "#f0ede4"
 
     def __init__(
         self,
         *,
-        bash_command: str,
+        bash_command: str | ArgNotSet,
         env: dict[str, str] | None = None,
         append_env: bool = False,
         output_encoding: str = "utf-8",
@@ -168,11 +166,32 @@ class BashOperator(BaseOperator):
         self.cwd = cwd
         self.append_env = append_env
 
+        # When using the @task.bash decorator, the Bash command is not known 
until the underlying Python
+        # callable is executed and therefore set to NOTSET initially. This 
flag is useful during execution to
+        # determine whether the bash_command value needs to re-rendered.
+        self._init_bash_command_not_set = isinstance(self.bash_command, 
ArgNotSet)
+
     @cached_property
     def subprocess_hook(self):
         """Returns hook for running the bash command."""
         return SubprocessHook()
 
+    @staticmethod
+    def refresh_bash_command(ti: TaskInstance) -> None:
+        """Rewrite the underlying rendered bash_command value for a task 
instance in the metadatabase.
+
+        TaskInstance.get_rendered_template_fields() cannot be used because 
this will retrieve the
+        RenderedTaskInstanceFields from the metadatabase which doesn't have 
the runtime-evaluated bash_command
+        value.
+
+        :meta private:
+        """
+        from airflow.models.renderedtifields import RenderedTaskInstanceFields
+
+        rtif = RenderedTaskInstanceFields(ti)  # Templates are rendered be 
default here.
+        RenderedTaskInstanceFields.write(rtif)
+        RenderedTaskInstanceFields.delete_old_records(ti.task_id, ti.dag_id)
+
     def get_env(self, context):
         """Build the set of environment variables to be exposed for the bash 
command."""
         system_env = os.environ.copy()
@@ -200,6 +219,16 @@ class BashOperator(BaseOperator):
             if not os.path.isdir(self.cwd):
                 raise AirflowException(f"The cwd {self.cwd} must be a 
directory")
         env = self.get_env(context)
+
+        # Because the bash_command value is evaluated at runtime using the 
@tash.bash decorator, the
+        # RenderedTaskInstanceField data needs to be rewritten and the 
bash_command value re-rendered -- the
+        # latter because the returned command from the decorated callable 
could contain a Jinja expression.
+        # Both will ensure the correct Bash command is executed and that the 
Rendered Template view in the UI
+        # displays the executed command (otherwise it will display as an 
ArgNotSet type).
+        if self._init_bash_command_not_set:
+            ti = cast("TaskInstance", context["ti"])
+            self.refresh_bash_command(ti)
+
         result = self.subprocess_hook.run_command(
             command=[bash_path, "-c", self.bash_command],
             env=env,
@@ -212,6 +241,7 @@ class BashOperator(BaseOperator):
             raise AirflowException(
                 f"Bash command failed. The command returned a non-zero exit 
code {result.exit_code}."
             )
+
         return result.output
 
     def on_kill(self) -> None:
diff --git a/docs/apache-airflow/howto/operator/bash.rst 
b/docs/apache-airflow/howto/operator/bash.rst
index 10b669099f..50d7286fe1 100644
--- a/docs/apache-airflow/howto/operator/bash.rst
+++ b/docs/apache-airflow/howto/operator/bash.rst
@@ -23,108 +23,265 @@ BashOperator
 ============
 
 Use the :class:`~airflow.operators.bash.BashOperator` to execute
-commands in a `Bash <https://www.gnu.org/software/bash/>`__ shell.
+commands in a `Bash <https://www.gnu.org/software/bash/>`__ shell. The Bash 
command or script to execute is
+determined by:
+
+1. The ``bash_command`` argument when using ``BashOperator``, or
+
+2. If using the TaskFlow decorator, ``@task.bash``, a non-empty string value 
returned from the decorated callable.
+
+
+.. tip::
+
+    The ``@task.bash`` decorator is recommended over the classic 
``BashOperator`` to execute Bash commands.
+
+
+.. tab-set::
+
+    .. tab-item:: @task.bash
+        :sync: taskflow
+
+        .. exampleinclude:: 
/../../airflow/example_dags/example_bash_decorator.py
+            :language: python
+            :dedent: 4
+            :start-after: [START howto_decorator_bash]
+            :end-before: [END howto_decorator_bash]
+
+    .. tab-item:: BashOperator
+        :sync: operator
+
+        .. exampleinclude:: 
/../../airflow/example_dags/example_bash_operator.py
+            :language: python
+            :dedent: 4
+            :start-after: [START howto_operator_bash]
+            :end-before: [END howto_operator_bash]
 
-.. exampleinclude:: /../../airflow/example_dags/example_bash_operator.py
-    :language: python
-    :dedent: 4
-    :start-after: [START howto_operator_bash]
-    :end-before: [END howto_operator_bash]
 
 Templating
 ----------
 
-You can use :ref:`Jinja templates <concepts:jinja-templating>` to parameterize 
the
-``bash_command`` argument.
+You can use :ref:`Jinja templates <concepts:jinja-templating>` to parameterize 
the Bash command.
+
+.. tab-set::
+
+    .. tab-item:: @task.bash
+        :sync: taskflow
+
+        .. exampleinclude:: 
/../../airflow/example_dags/example_bash_decorator.py
+            :language: python
+            :dedent: 4
+            :start-after: [START howto_decorator_bash_template]
+            :end-before: [END howto_decorator_bash_template]
+
+    .. tab-item:: BashOperator
+        :sync: operator
+
+        .. exampleinclude:: 
/../../airflow/example_dags/example_bash_operator.py
+            :language: python
+            :dedent: 4
+            :start-after: [START howto_operator_bash_template]
+            :end-before: [END howto_operator_bash_template]
+
+Using the ``@task.bash`` TaskFlow decorator allows you to return a formatted 
string and take advantage of
+having all :ref:`execution context variables directly accessible to decorated 
tasks <taskflow/accessing_context_variables>`.
 
-.. exampleinclude:: /../../airflow/example_dags/example_bash_operator.py
+.. exampleinclude:: /../../airflow/example_dags/example_bash_decorator.py
     :language: python
     :dedent: 4
-    :start-after: [START howto_operator_bash_template]
-    :end-before: [END howto_operator_bash_template]
+    :start-after: [START howto_decorator_bash_context_vars]
+    :end-before: [END howto_decorator_bash_context_vars]
 
+You are encouraged to take advantage of this approach as it fits nicely into 
the overall TaskFlow paradigm.
 
-.. warning::
+.. caution::
 
-    Care should be taken with "user" input or when using Jinja templates in the
-    ``bash_command``, as this bash operator does not perform any escaping or
-    sanitization of the command.
+    Care should be taken with "user" input when using Jinja templates in the 
Bash command as escaping and
+    sanitization of the Bash command is not performed.
 
-    This applies mostly to using "dag_run" conf, as that can be submitted via
-    users in the Web UI. Most of the default template variables are not at
-    risk.
+    This applies mostly to using ``dag_run.conf``, as that can be submitted 
via users in the Web UI. Most of
+    the default template variables are not at risk.
 
-For example, do **not** do this:
+    For example, do **not** do:
 
-.. code-block:: python
+    .. tab-set::
 
-    bash_task = BashOperator(
-        task_id="bash_task",
-        bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] 
if dag_run else "" }}\'"',
-    )
+        .. tab-item:: @task.bash
+            :sync: taskflow
 
-Instead, you should pass this via the ``env`` kwarg and use double-quotes
-inside the bash_command, as below:
+            .. code-block:: python
 
-.. code-block:: python
+                @task.bash
+                def bash_task() -> str:
+                    return 'echo "Here is the message: \'{{ 
dag_run.conf["message"] if dag_run.conf else "" }}\'"'
+
+
+                # Or directly accessing `dag_run.conf`
+                @task.bash
+                def bash_task(dag_run) -> str:
+                    message = dag_run.conf["message"] if dag_run.conf else ""
+                    return f'echo "here is the message: {message}"'
+
+        .. tab-item:: BashOperator
+            :sync: operator
+
+            .. code-block:: python
+
+                bash_task = BashOperator(
+                    task_id="bash_task",
+                    bash_command='echo "Here is the message: \'{{ 
dag_run.conf["message"] if dag_run.conf else "" }}\'"',
+                )
+
+
+    Instead, you should pass this via the ``env`` kwarg and use double-quotes 
inside the Bash command.
+
+    .. tab-set::
+
+        .. tab-item:: @task.bash
+            :sync: taskflow
+
+            .. code-block:: python
+
+                @task.bash(env={"message": '{{ dag_run.conf["message"] if 
dag_run.conf else "" }}'})
+                def bash_task() -> str:
+                    return "echo \"here is the message: '$message'\""
+
+        .. tab-item:: BashOperator
+            :sync: operator
+
+            .. code-block:: python
+
+                bash_task = BashOperator(
+                    task_id="bash_task",
+                    bash_command="echo \"here is the message: '$message'\"",
+                    env={"message": '{{ dag_run.conf["message"] if 
dag_run.conf else "" }}'},
+                )
 
-    bash_task = BashOperator(
-        task_id="bash_task",
-        bash_command="echo \"here is the message: '$message'\"",
-        env={"message": '{{ dag_run.conf["message"] if dag_run else "" }}'},
-    )
 
 Skipping
 --------
 
-In general a non-zero exit code produces an AirflowException and thus a task 
failure.  In cases where it is desirable
-to instead have the task end in a ``skipped`` state, you can exit with code 
``99`` (or with another exit code if you
-pass ``skip_exit_code``).
+In general a non-zero exit code produces an AirflowException and thus a task 
failure.  In cases where it is
+desirable to instead have the task end in a ``skipped`` state, you can exit 
with code ``99`` (or with another
+exit code if you pass ``skip_on_exit_code``).
 
-.. exampleinclude:: /../../airflow/example_dags/example_bash_operator.py
-    :language: python
-    :start-after: [START howto_operator_bash_skip]
-    :end-before: [END howto_operator_bash_skip]
+.. tab-set::
+
+    .. tab-item:: @task.bash
+        :sync: taskflow
+
+        .. exampleinclude:: 
/../../airflow/example_dags/example_bash_decorator.py
+            :language: python
+            :dedent: 4
+            :start-after: [START howto_decorator_bash_skip]
+            :end-before: [END howto_decorator_bash_skip]
+
+    .. tab-item:: BashOperator
+        :sync: operator
+
+        .. exampleinclude:: 
/../../airflow/example_dags/example_bash_operator.py
+            :language: python
+            :start-after: [START howto_operator_bash_skip]
+            :end-before: [END howto_operator_bash_skip]
 
 
-Troubleshooting
----------------
+Executing commands from files
+-----------------------------
+Both the ``BashOperator`` and ``@task.bash`` TaskFlow decorator enables you to 
execute Bash commands stored
+in files. The files **must** have a ``.sh`` or ``.bash`` extension.
+
+Note the space after the script name (more on this in the next section).
+
+.. tab-set::
+
+    .. tab-item:: @task.bash
+        :sync: taskflow
+
+        .. code-block:: python
+            :emphasize-lines: 3
+
+            @task.bash
+            def run_command_from_script() -> str:
+                return "$AIRFLOW_HOME/scripts/example.sh "
+
+
+            run_script = run_command_from_script()
+
+    .. tab-item:: BashOperator
+        :sync: operator
+
+        .. code-block:: python
+            :emphasize-lines: 3
+
+            run_script = BashOperator(
+                task_id="run_command_from_script",
+                bash_command="$AIRFLOW_HOME/scripts/example.sh ",
+            )
+
 
 Jinja template not found
 """"""""""""""""""""""""
 
-Add a space after the script name when directly calling a Bash script with
-the ``bash_command`` argument. This is because Airflow tries to apply a Jinja
-template to it, which will fail.
+If you encounter a "Template not found" exception when trying to execute a 
Bash script, add a space after the
+script name. This is because Airflow tries to apply a Jinja template to it, 
which will fail.
+
+.. tab-set::
+
+    .. tab-item:: @task.bash
+        :sync: taskflow
+
+        .. code-block:: python
+
+            @task.bash
+            def bash_example():
+                # This fails with 'Jinja template not found' error
+                # return "/home/batcher/test.sh",
+                # This works (has a space after)
+                return "/home/batcher/test.sh "
 
-.. code-block:: python
+    .. tab-item:: BashOperator
+        :sync: operator
 
-    t2 = BashOperator(
-        task_id="bash_example",
-        # This fails with 'Jinja template not found' error
-        # bash_command="/home/batcher/test.sh",
-        # This works (has a space after)
-        bash_command="/home/batcher/test.sh ",
-        dag=dag,
-    )
+        .. code-block:: python
 
-However, if you want to use templating in your bash script, do not add the 
space
-and instead put your bash script in a location relative to the directory 
containing
+            BashOperator(
+                task_id="bash_example",
+                # This fails with 'Jinja template not found' error
+                # bash_command="/home/batcher/test.sh",
+                # This works (has a space after)
+                bash_command="/home/batcher/test.sh ",
+            )
+
+However, if you want to use templating in your Bash script, do not add the 
space
+and instead put your Bash script in a location relative to the directory 
containing
 the DAG file. So if your DAG file is in 
``/usr/local/airflow/dags/test_dag.py``, you can
 move your ``test.sh`` file to any location under ``/usr/local/airflow/dags/`` 
(Example:
 ``/usr/local/airflow/dags/scripts/test.sh``) and pass the relative path to 
``bash_command``
 as shown below:
 
-.. code-block:: python
+.. tab-set::
+
+    .. tab-item:: @tash.bash
+        :sync: taskflow
+
+        .. code-block:: python
+
+            @task.bash
+            def bash_example():
+                # "scripts" folder is under "/usr/local/airflow/dags"
+                return "scripts/test.sh"
 
-    t2 = BashOperator(
-        task_id="bash_example",
-        # "scripts" folder is under "/usr/local/airflow/dags"
-        bash_command="scripts/test.sh",
-        dag=dag,
-    )
+    .. tab-item:: BashOperator
+        :sync: operator
 
-Creating separate folder for bash scripts may be desirable for many reasons, 
like
+        .. code-block:: python
+
+            t2 = BashOperator(
+                task_id="bash_example",
+                # "scripts" folder is under "/usr/local/airflow/dags"
+                bash_command="scripts/test.sh",
+            )
+
+Creating separate folder for Bash scripts may be desirable for many reasons, 
like
 separating your script's logic and pipeline code, allowing for proper code 
highlighting
 in files composed in different languages, and general flexibility in 
structuring
 pipelines.
@@ -132,17 +289,58 @@ pipelines.
 It is also possible to define your ``template_searchpath`` as pointing to any 
folder
 locations in the DAG constructor call.
 
-Example:
+.. tab-set::
+
+    .. tab-item:: @task.bash
+        :sync: taskflow
+
+        .. code-block:: python
+            :emphasize-lines: 1
+
+            @dag(..., template_searchpath="/opt/scripts")
+            def example_bash_dag():
+                @task.bash
+                def bash_example():
+                    return "test.sh "
+
+    .. tab-item:: BashOperator
+        :sync: operator
+
+        .. code-block:: python
+            :emphasize-lines: 1
+
+            with DAG("example_bash_dag", ..., 
template_searchpath="/opt/scripts"):
+                t2 = BashOperator(
+                    task_id="bash_example",
+                    bash_command="test.sh ",
+                )
+
+Enriching Bash with Python
+--------------------------
 
-.. code-block:: python
+The ``@task.bash`` TaskFlow decorator allows you to combine both Bash and 
Python into a powerful combination
+within a task.
+
+Using Python conditionals, other function calls, etc. within a ``@task.bash`` 
task can help define, augment,
+or even build the Bash command(s) to execute.
+
+For example, use conditional logic to determine task behavior:
+
+.. exampleinclude:: /../../airflow/example_dags/example_bash_decorator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_decorator_bash_conditional]
+    :end-before: [END howto_decorator_bash_conditional]
+
+Or call a function to help build a Bash command:
+
+.. exampleinclude:: /../../airflow/example_dags/example_bash_decorator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_decorator_bash_build_cmd]
+    :end-before: [END howto_decorator_bash_build_cmd]
 
-    dag = DAG("example_bash_dag", template_searchpath="/opt/scripts")
-    t2 = BashOperator(
-        task_id="bash_example",
-        # "test.sh" is a file under "/opt/scripts"
-        bash_command="test.sh ",
-        dag=dag,
-    )
+There are numerous possibilities with this type of pre-execution enrichment.
 
 
 .. _howto/operator:BashSensor:
diff --git a/docs/apache-airflow/tutorial/taskflow.rst 
b/docs/apache-airflow/tutorial/taskflow.rst
index 08967bdde3..5d71576b59 100644
--- a/docs/apache-airflow/tutorial/taskflow.rst
+++ b/docs/apache-airflow/tutorial/taskflow.rst
@@ -576,6 +576,8 @@ task to copy the same file to a date-partitioned storage 
location in S3 for long
         dest_bucket_key=f"""{BASE_PATH}/{"{{ 
execution_date.strftime('%Y/%m/%d') }}"}/{FILE_NAME}""",
     )
 
+.. _taskflow/accessing_context_variables:
+
 Accessing context variables in decorated tasks
 ----------------------------------------------
 
diff --git a/tests/decorators/test_bash.py b/tests/decorators/test_bash.py
new file mode 100644
index 0000000000..850f46b0e4
--- /dev/null
+++ b/tests/decorators/test_bash.py
@@ -0,0 +1,453 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+import stat
+from contextlib import nullcontext as no_raise
+from unittest import mock
+
+import pytest
+
+from airflow.decorators import task
+from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.models.renderedtifields import RenderedTaskInstanceFields
+from airflow.utils import timezone
+from airflow.utils.types import NOTSET
+from tests.test_utils.db import clear_db_dags, clear_db_runs, 
clear_rendered_ti_fields
+
+DEFAULT_DATE = timezone.datetime(2023, 1, 1)
+
+
[email protected]_test
+class TestBashDecorator:
+    @pytest.fixture(scope="function", autouse=True)
+    def setup(self, dag_maker):
+        self.dag_maker = dag_maker
+
+        with dag_maker(dag_id="bash_deco_dag") as dag:
+            ...
+
+        self.dag = dag
+
+    def teardown_method(self):
+        clear_db_runs()
+        clear_db_dags()
+        clear_rendered_ti_fields()
+
+    def execute_task(self, task):
+        session = self.dag_maker.session
+        dag_run = self.dag_maker.create_dagrun(
+            run_id=f"bash_deco_test_{DEFAULT_DATE.date()}", session=session
+        )
+        ti = dag_run.get_task_instance(task.operator.task_id, session=session)
+        return_val = task.operator.execute(context={"ti": ti})
+
+        return ti, return_val
+
+    @staticmethod
+    def validate_bash_command_rtif(ti, expected_command):
+        assert 
RenderedTaskInstanceFields.get_templated_fields(ti)["bash_command"] == 
expected_command
+
+    def test_bash_decorator_init(self):
+        """Test the initialization of the @task.bash decorator."""
+
+        with self.dag:
+
+            @task.bash
+            def bash():
+                ...
+
+            bash_task = bash()
+
+        assert bash_task.operator.task_id == "bash"
+        assert bash_task.operator.bash_command == NOTSET
+        assert bash_task.operator.env is None
+        assert bash_task.operator.append_env is False
+        assert bash_task.operator.output_encoding == "utf-8"
+        assert bash_task.operator.skip_on_exit_code == [99]
+        assert bash_task.operator.cwd is None
+        assert bash_task.operator._init_bash_command_not_set is True
+
+    @pytest.mark.parametrize(
+        argnames=["command", "expected_command", "expected_return_val"],
+        argvalues=[
+            pytest.param("echo hello world", "echo hello world", "hello 
world", id="not_templated"),
+            pytest.param(
+                "echo {{ ds }}", f"echo {DEFAULT_DATE.date()}", 
str(DEFAULT_DATE.date()), id="templated"
+            ),
+        ],
+    )
+    def test_bash_command(self, command, expected_command, 
expected_return_val):
+        """Test the runtime bash_command is the function's return string, 
rendered if needed."""
+
+        with self.dag:
+
+            @task.bash
+            def bash():
+                return command
+
+            bash_task = bash()
+
+        assert bash_task.operator.bash_command == NOTSET
+
+        ti, return_val = self.execute_task(bash_task)
+
+        assert bash_task.operator.bash_command == expected_command
+        assert return_val == expected_return_val
+
+        self.validate_bash_command_rtif(ti, expected_command)
+
+    def test_op_args_kwargs(self):
+        """Test op_args and op_kwargs are passed to the bash_command."""
+
+        with self.dag:
+
+            @task.bash
+            def bash(id, other_id):
+                return f"echo hello {id} && echo {other_id}"
+
+            bash_task = bash("world", other_id="2")
+
+        assert bash_task.operator.bash_command == NOTSET
+
+        ti, return_val = self.execute_task(bash_task)
+
+        assert bash_task.operator.bash_command == "echo hello world && echo 2"
+        assert return_val == "2"
+
+        self.validate_bash_command_rtif(ti, "echo hello world && echo 2")
+
+    def test_multiline_command(self):
+        """Verify a multi-line string can be used as a Bash command."""
+        command = """
+        echo {foo} |
+        rev
+        """
+        excepted_command = command.format(foo="foo")
+
+        with self.dag:
+
+            @task.bash
+            def bash(foo):
+                return command.format(foo=foo)
+
+            bash_task = bash("foo")
+
+        assert bash_task.operator.bash_command == NOTSET
+
+        ti, return_val = self.execute_task(bash_task)
+
+        assert return_val == "oof"
+        assert bash_task.operator.bash_command == excepted_command
+
+        self.validate_bash_command_rtif(ti, excepted_command)
+
+    @pytest.mark.parametrize(
+        argnames=["append_env", "user_defined_env", "expected_airflow_home"],
+        argvalues=[
+            pytest.param(False, {"var": "value"}, "", id="no_append_env"),
+            pytest.param(True, {"var": "value"}, "path/to/airflow/home", 
id="append_env"),
+        ],
+    )
+    def test_env_variables(self, append_env, user_defined_env, 
expected_airflow_home, caplog):
+        """Test env variables exist appropriately depending on if the existing 
env variables are allowed."""
+        with self.dag:
+
+            @task.bash(env=user_defined_env, append_env=append_env)
+            def bash():
+                return "echo var=$var; echo AIRFLOW_HOME=$AIRFLOW_HOME;"
+
+            bash_task = bash()
+
+        assert bash_task.operator.bash_command == NOTSET
+
+        with mock.patch.dict("os.environ", {"AIRFLOW_HOME": 
"path/to/airflow/home"}):
+            ti, return_val = self.execute_task(bash_task)
+
+        assert bash_task.operator.env == user_defined_env
+        assert "var=value" in caplog.text
+        assert f"AIRFLOW_HOME={expected_airflow_home}" in caplog.text
+
+        self.validate_bash_command_rtif(ti, "echo var=$var; echo 
AIRFLOW_HOME=$AIRFLOW_HOME;")
+
+    @pytest.mark.parametrize(
+        argnames=["exit_code", "expected"],
+        argvalues=[
+            pytest.param(99, pytest.raises(AirflowSkipException), id="skip"),
+            pytest.param(1, pytest.raises(AirflowException), id="non-zero"),
+            pytest.param(0, no_raise(), id="zero"),
+        ],
+    )
+    def test_exit_code_behavior(self, exit_code, expected):
+        """Test @task.bash tasks behave appropriately relative the exit code 
from the bash_command."""
+
+        with self.dag:
+
+            @task.bash
+            def bash(code):
+                return f"exit {code}"
+
+            bash_task = bash(exit_code)
+
+        assert bash_task.operator.bash_command == NOTSET
+
+        with expected:
+            ti, return_val = self.execute_task(bash_task)
+
+            self.validate_bash_command_rtif(ti, f"exit {exit_code}")
+
+    @pytest.mark.parametrize(
+        argnames=["skip_on_exit_code", "exit_code", "expected"],
+        argvalues=[
+            pytest.param(None, 99, pytest.raises(AirflowSkipException), 
id="default_skip_exit_99"),
+            pytest.param(None, 1, pytest.raises(AirflowException), 
id="default_skip_exit_1"),
+            pytest.param(None, 0, no_raise(), id="default_skip_exit_0"),
+            pytest.param({"skip_on_exit_code": 86}, 0, no_raise(), 
id="skip_86_exit_0"),
+            pytest.param(
+                {"skip_on_exit_code": 100}, 42, 
pytest.raises(AirflowException), id="skip_100_exit_42"
+            ),
+            pytest.param(
+                {"skip_on_exit_code": 100}, 100, 
pytest.raises(AirflowSkipException), id="skip_100_exit_100"
+            ),
+            pytest.param(
+                {"skip_on_exit_code": [100, 101]},
+                100,
+                pytest.raises(AirflowSkipException),
+                id="skip_100-101_exit_100",
+            ),
+            pytest.param(
+                {"skip_on_exit_code": [100, 101]},
+                102,
+                pytest.raises(AirflowException),
+                id="skip_100-101_exit_102",
+            ),
+        ],
+    )
+    def test_skip_on_exit_code_behavior(self, skip_on_exit_code, exit_code, 
expected):
+        """Ensure tasks behave appropriately relative to defined skip exit 
code from the bash_command."""
+
+        with self.dag:
+
+            @task.bash(**skip_on_exit_code if skip_on_exit_code else {})
+            def bash(code):
+                return f"exit {code}"
+
+            bash_task = bash(exit_code)
+
+        assert bash_task.operator.bash_command == NOTSET
+
+        with expected:
+            ti, return_val = self.execute_task(bash_task)
+
+            self.validate_bash_command_rtif(ti, f"exit {exit_code}")
+
+    @pytest.mark.parametrize(
+        argnames=[
+            "user_defined_env",
+            "append_env",
+            "expected_razz",
+            "expected_airflow_home",
+        ],
+        argvalues=[
+            pytest.param(
+                {"razz": "matazz"}, True, "matazz", "path/to/airflow/home", 
id="user_defined_env_and_append"
+            ),
+            pytest.param({"razz": "matazz"}, False, "matazz", "", 
id="user_defined_env_no_append"),
+            pytest.param({}, True, "", "path/to/airflow/home", 
id="no_user_defined_env_and_append"),
+            pytest.param({}, False, "", "", 
id="no_user_defined_env_no_append"),
+        ],
+    )
+    def test_env_variables_in_bash_command_file(
+        self,
+        user_defined_env,
+        append_env,
+        expected_razz,
+        expected_airflow_home,
+        tmp_path,
+        caplog,
+    ):
+        """Test the behavior of user-defined env vars when using an external 
file with a Bash command."""
+        cmd_file = tmp_path / "test_file.sh"
+        cmd_file.write_text("#!/usr/bin/env bash\necho 
AIRFLOW_HOME=$AIRFLOW_HOME\necho razz=$razz\n")
+        cmd_file.chmod(stat.S_IEXEC)
+
+        with self.dag:
+
+            @task.bash(env=user_defined_env, append_env=append_env)
+            def bash(command_file_name):
+                return command_file_name
+
+            with mock.patch.dict("os.environ", {"AIRFLOW_HOME": 
"path/to/airflow/home"}):
+                bash_task = bash(f"{cmd_file} ")
+
+                assert bash_task.operator.bash_command == NOTSET
+
+                ti, return_val = self.execute_task(bash_task)
+
+        assert f"razz={expected_razz}" in caplog.text
+        assert f"AIRFLOW_HOME={expected_airflow_home}" in caplog.text
+        assert return_val == f"razz={expected_razz}"
+        self.validate_bash_command_rtif(ti, f"{cmd_file} ")
+
+    def test_valid_cwd(self, tmp_path):
+        """Test a user-defined working directory can be used."""
+        cwd_path = tmp_path / "test_cwd"
+        cwd_path.mkdir()
+
+        with self.dag:
+
+            @task.bash(cwd=os.fspath(cwd_path))
+            def bash():
+                return "echo foo | tee output.txt"
+
+            bash_task = bash()
+
+        assert bash_task.operator.bash_command == NOTSET
+
+        ti, return_val = self.execute_task(bash_task)
+
+        assert return_val == "foo"
+        assert (cwd_path / "output.txt").read_text().splitlines()[0] == "foo"
+        self.validate_bash_command_rtif(ti, "echo foo | tee output.txt")
+
+    def test_cwd_does_not_exist(self, tmp_path):
+        """Verify task failure for non-existent, user-defined working 
directory."""
+        cwd_path = tmp_path / "test_cwd"
+
+        with self.dag:
+
+            @task.bash(cwd=os.fspath(cwd_path))
+            def bash():
+                return "echo"
+
+            bash_task = bash()
+
+        assert bash_task.operator.bash_command == NOTSET
+
+        with pytest.raises(AirflowException, match=f"Can not find the cwd: 
{cwd_path}"):
+            ti, _ = self.execute_task(bash_task)
+
+            self.validate_bash_command_rtif(ti, "echo")
+
+    def test_cwd_is_file(self, tmp_path):
+        """Verify task failure for user-defined working directory that is 
actually a file."""
+        cwd_file = tmp_path / "test_file.sh"
+        cwd_file.touch()
+
+        with self.dag:
+
+            @task.bash(cwd=os.fspath(cwd_file))
+            def bash():
+                return "echo"
+
+            bash_task = bash()
+
+        assert bash_task.operator.bash_command == NOTSET
+
+        with pytest.raises(AirflowException, match=f"The cwd {cwd_file} must 
be a directory"):
+            ti, _ = self.execute_task(bash_task)
+
+            self.validate_bash_command_rtif(ti, "echo")
+
+    def test_command_not_found(self):
+        """Fail task if executed command is not found on path."""
+        with pytest.raises(
+            AirflowException, match="Bash command failed\\. The command 
returned a non-zero exit code 127\\."
+        ):
+            with self.dag:
+
+                @task.bash
+                def bash():
+                    return "set -e; something-that-isnt-on-path"
+
+                bash_task = bash()
+
+            assert bash_task.operator.bash_command == NOTSET
+
+            ti, _ = self.execute_task(bash_task)
+            self.validate_bash_command_rtif(ti, "set -e; 
something-that-isnt-on-path")
+
+    @pytest.mark.parametrize(argnames="multiple_outputs", argvalues=[True, 
False])
+    def test_multiple_outputs(self, multiple_outputs):
+        """Verify setting `multiple_outputs` for a @task.bash-decorated 
function is ignored."""
+
+        with self.dag:
+
+            @task.bash(multiple_outputs=multiple_outputs)
+            def bash():
+                return "echo"
+
+            with pytest.warns(
+                UserWarning, match="`multiple_outputs` is not supported in 
@task.bash tasks. Ignoring."
+            ):
+                bash_task = bash()
+
+                assert bash_task.operator.bash_command == NOTSET
+
+                ti, _ = self.execute_task(bash_task)
+
+        assert bash_task.operator.multiple_outputs is False
+        self.validate_bash_command_rtif(ti, "echo")
+
+    @pytest.mark.parametrize(
+        argnames=["return_val", "expected"],
+        argvalues=[
+            pytest.param(None, pytest.raises(TypeError), 
id="return_none_typeerror"),
+            pytest.param(1, pytest.raises(TypeError), 
id="return_int_typeerror"),
+            pytest.param(NOTSET, pytest.raises(TypeError), 
id="return_notset_typeerror"),
+            pytest.param(True, pytest.raises(TypeError), 
id="return_boolean_typeerror"),
+            pytest.param("", pytest.raises(TypeError), 
id="return_empty_string_typerror"),
+            pytest.param("  ", pytest.raises(TypeError), 
id="return_spaces_string_typerror"),
+            pytest.param(["echo;", "exit 99;"], pytest.raises(TypeError), 
id="return_list_typerror"),
+            pytest.param("echo", no_raise(), id="return_string_no_error"),
+        ],
+    )
+    def test_callable_return_is_string(self, return_val, expected):
+        """Ensure the returned value from the decorated callable is a 
non-empty string."""
+
+        with self.dag:
+
+            @task.bash
+            def bash():
+                return return_val
+
+            bash_task = bash()
+
+        assert bash_task.operator.bash_command == NOTSET
+
+        with expected:
+            ti, _ = self.execute_task(bash_task)
+
+            self.validate_bash_command_rtif(ti, return_val)
+
+    def test_rtif_updates_upon_failure(self):
+        """Veriy RenderedTaskInstanceField data should contain the rendered 
command even if the task fails."""
+        with self.dag:
+
+            @task.bash
+            def bash():
+                return "{{ ds }}; exit 1;"
+
+            bash_task = bash()
+
+        assert bash_task.operator.bash_command == NOTSET
+
+        with pytest.raises(AirflowException):
+            ti, _ = self.execute_task(bash_task)
+
+            self.validate_bash_command_rtif(ti, f"{DEFAULT_DATE.date()}; exit 
1;")
diff --git a/tests/operators/test_bash.py b/tests/operators/test_bash.py
index 351c6595bf..fb2e4146bd 100644
--- a/tests/operators/test_bash.py
+++ b/tests/operators/test_bash.py
@@ -37,7 +37,24 @@ END_DATE = datetime(2016, 1, 2, tzinfo=timezone.utc)
 INTERVAL = timedelta(hours=12)
 
 
[email protected]()
+def context():
+    yield {"ti": mock.Mock()}
+
+
 class TestBashOperator:
+    def test_bash_operator_init(self):
+        """Test the construction of the operator with its defaults and 
initially-derived attrs."""
+        op = BashOperator(task_id="bash_op", bash_command="echo")
+
+        assert op.bash_command == "echo"
+        assert op.env is None
+        assert op.append_env is False
+        assert op.output_encoding == "utf-8"
+        assert op.skip_on_exit_code == [99]
+        assert op.cwd is None
+        assert op._init_bash_command_not_set is False
+
     @pytest.mark.db_test
     @pytest.mark.parametrize(
         "append_env,user_defined_env,expected_airflow_home",
@@ -105,17 +122,17 @@ class TestBashOperator:
             ("", ""),
         ],
     )
-    def test_return_value(self, val, expected):
+    def test_return_value(self, val, expected, context):
         op = BashOperator(task_id="abc", bash_command=f'set -e; echo "{val}";')
-        line = op.execute({})
+        line = op.execute(context)
         assert line == expected
 
-    def test_raise_exception_on_non_zero_exit_code(self):
+    def test_raise_exception_on_non_zero_exit_code(self, context):
         bash_operator = BashOperator(bash_command="exit 42", 
task_id="test_return_value", dag=None)
         with pytest.raises(
             AirflowException, match="Bash command failed\\. The command 
returned a non-zero exit code 42\\."
         ):
-            bash_operator.execute(context={})
+            bash_operator.execute(context)
 
     def test_task_retries(self):
         bash_operator = BashOperator(
@@ -129,25 +146,25 @@ class TestBashOperator:
 
         assert bash_operator.retries == 0
 
-    def test_command_not_found(self):
+    def test_command_not_found(self, context):
         with pytest.raises(
             AirflowException, match="Bash command failed\\. The command 
returned a non-zero exit code 127\\."
         ):
-            BashOperator(task_id="abc", bash_command="set -e; 
something-that-isnt-on-path").execute({})
+            BashOperator(task_id="abc", bash_command="set -e; 
something-that-isnt-on-path").execute(context)
 
-    def test_unset_cwd(self):
+    def test_unset_cwd(self, context):
         val = "xxxx"
         op = BashOperator(task_id="abc", bash_command=f'set -e; echo "{val}";')
-        line = op.execute({})
+        line = op.execute(context)
         assert line == val
 
-    def test_cwd_does_not_exist(self, tmp_path):
+    def test_cwd_does_not_exist(self, context, tmp_path):
         test_cmd = 'set -e; echo "xxxx" |tee outputs.txt'
         test_cwd_folder = os.fspath(tmp_path / "test_command_with_cwd")
         # There should be no exceptions when creating the operator even the 
`cwd` doesn't exist
         bash_operator = BashOperator(task_id="abc", bash_command=test_cmd, 
cwd=os.fspath(test_cwd_folder))
         with pytest.raises(AirflowException, match=f"Can not find the cwd: 
{test_cwd_folder}"):
-            bash_operator.execute({})
+            bash_operator.execute(context)
 
     def test_cwd_is_file(self, tmp_path):
         test_cmd = 'set -e; echo "xxxx" |tee outputs.txt'
@@ -157,12 +174,14 @@ class TestBashOperator:
         with pytest.raises(AirflowException, match=f"The cwd {tmp_file} must 
be a directory"):
             BashOperator(task_id="abc", bash_command=test_cmd, 
cwd=os.fspath(tmp_file)).execute({})
 
-    def test_valid_cwd(self, tmp_path):
+    def test_valid_cwd(self, context, tmp_path):
         test_cmd = 'set -e; echo "xxxx" |tee outputs.txt'
         test_cwd_path = tmp_path / "test_command_with_cwd"
         test_cwd_path.mkdir()
         # Test everything went alright
-        result = BashOperator(task_id="abc", bash_command=test_cmd, 
cwd=os.fspath(test_cwd_path)).execute({})
+        result = BashOperator(task_id="abc", bash_command=test_cmd, 
cwd=os.fspath(test_cwd_path)).execute(
+            context
+        )
         assert result == "xxxx"
         assert (test_cwd_path / "outputs.txt").read_text().splitlines()[0] == 
"xxxx"
 
@@ -180,23 +199,23 @@ class TestBashOperator:
             ({"skip_on_exit_code": None}, 0, None),
         ],
     )
-    def test_skip(self, extra_kwargs, actual_exit_code, expected_exc):
+    def test_skip(self, extra_kwargs, actual_exit_code, expected_exc, context):
         kwargs = dict(task_id="abc", bash_command=f'set -e; echo "hello 
world"; exit {actual_exit_code};')
         if extra_kwargs:
             kwargs.update(**extra_kwargs)
         if expected_exc is None:
-            BashOperator(**kwargs).execute({})
+            BashOperator(**kwargs).execute(context)
         else:
             with pytest.raises(expected_exc):
-                BashOperator(**kwargs).execute({})
+                BashOperator(**kwargs).execute(context)
 
-    def test_bash_operator_multi_byte_output(self):
+    def test_bash_operator_multi_byte_output(self, context):
         op = BashOperator(
             task_id="test_multi_byte_bash_operator",
             bash_command="echo \u2600",
             output_encoding="utf-8",
         )
-        op.execute(context={})
+        op.execute(context)
 
     @pytest.mark.db_test
     def test_bash_operator_kill(self, dag_maker):

Reply via email to