This is an automated email from the ASF dual-hosted git repository.
joshfell pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e920344044 Introduce @task.bash TaskFlow decorator (#30176)
e920344044 is described below
commit e9203440445b754ae9915b716bb2cdf59601cd79
Author: Josh Fell <[email protected]>
AuthorDate: Tue Dec 19 16:51:54 2023 -0500
Introduce @task.bash TaskFlow decorator (#30176)
* Introduce @task.bash TaskFlow decorator
Adding a @task.bash TaskFlow decorator to the collection of existing core
TaskFlow decorators. This particular decorator will use the return value of the
decorated callable as the Bash command to execute using the existing
BashOperator functionality.
---
airflow/decorators/__init__.py | 3 +
airflow/decorators/__init__.pyi | 32 ++
airflow/decorators/bash.py | 100 ++++++
airflow/example_dags/example_bash_decorator.py | 115 +++++++
airflow/operators/bash.py | 50 ++-
docs/apache-airflow/howto/operator/bash.rst | 342 +++++++++++++++----
docs/apache-airflow/tutorial/taskflow.rst | 2 +
tests/decorators/test_bash.py | 453 +++++++++++++++++++++++++
tests/operators/test_bash.py | 53 ++-
9 files changed, 1051 insertions(+), 99 deletions(-)
diff --git a/airflow/decorators/__init__.py b/airflow/decorators/__init__.py
index 31bcfb263c..c43b98ee73 100644
--- a/airflow/decorators/__init__.py
+++ b/airflow/decorators/__init__.py
@@ -19,6 +19,7 @@ from __future__ import annotations
from typing import Any, Callable
from airflow.decorators.base import TaskDecorator
+from airflow.decorators.bash import bash_task
from airflow.decorators.branch_external_python import
branch_external_python_task
from airflow.decorators.branch_python import branch_task
from airflow.decorators.branch_virtualenv import branch_virtualenv_task
@@ -47,6 +48,7 @@ __all__ = [
"branch_external_python_task",
"short_circuit_task",
"sensor_task",
+ "bash_task",
"setup",
"teardown",
]
@@ -63,6 +65,7 @@ class TaskDecoratorCollection:
branch_external_python = staticmethod(branch_external_python_task)
short_circuit = staticmethod(short_circuit_task)
sensor = staticmethod(sensor_task)
+ bash = staticmethod(bash_task)
__call__: Any = python # Alias '@task' to '@task.python'.
diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi
index 48cdf61946..238393c034 100644
--- a/airflow/decorators/__init__.pyi
+++ b/airflow/decorators/__init__.pyi
@@ -26,6 +26,7 @@ from typing import Any, Callable, Collection, Container,
Iterable, Mapping, over
from kubernetes.client import models as k8s
from airflow.decorators.base import FParams, FReturn, Task, TaskDecorator
+from airflow.decorators.bash import bash_task
from airflow.decorators.branch_external_python import
branch_external_python_task
from airflow.decorators.branch_python import branch_task
from airflow.decorators.branch_virtualenv import branch_virtualenv_task
@@ -54,6 +55,7 @@ __all__ = [
"branch_external_python_task",
"short_circuit_task",
"sensor_task",
+ "bash_task",
"setup",
"teardown",
]
@@ -638,6 +640,36 @@ class TaskDecoratorCollection:
def pyspark(
self, python_callable: Callable[FParams, FReturn] | None = None
) -> Task[FParams, FReturn]: ...
+ @overload
+ def bash(
+ self,
+ *,
+ env: dict[str, str] | None = None,
+ append_env: bool = False,
+ output_encoding: str = "utf-8",
+ skip_on_exit_code: int = 99,
+ cwd: str | None = None,
+ **kwargs,
+ ) -> TaskDecorator:
+ """Decorator to wrap a callable into a BashOperator task.
+
+ :param bash_command: The command, set of commands or reference to a
bash script
+ (must be '.sh' or '.bash') to be executed. (templated)
+ :param env: If env is not None, it must be a dict that defines the
environment variables for the new
+ process; these are used instead of inheriting the current process
environment, which is the
+ default behavior. (templated)
+ :param append_env: If False(default) uses the environment variables
passed in env params and does not
+ inherit the current process environment. If True, inherits the
environment variables from current
+ passes and then environment variable passed by the user will
either update the existing inherited
+ environment variables or the new variables gets appended to it
+ :param output_encoding: Output encoding of bash command
+ :param skip_on_exit_code: If task exits with this exit code, leave the
task in ``skipped`` state
+ (default: 99). If set to ``None``, any non-zero exit code will be
treated as a failure.
+ :param cwd: Working directory to execute the command in. If None
(default), the command is run in a
+ temporary directory.
+ """
+ @overload
+ def bash(self, python_callable: Callable[FParams, FReturn]) ->
Task[FParams, FReturn]: ...
task: TaskDecoratorCollection
setup: Callable
diff --git a/airflow/decorators/bash.py b/airflow/decorators/bash.py
new file mode 100644
index 0000000000..70011c3079
--- /dev/null
+++ b/airflow/decorators/bash.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import warnings
+from typing import Any, Callable, Collection, Mapping, Sequence
+
+from airflow.decorators.base import DecoratedOperator, TaskDecorator,
task_decorator_factory
+from airflow.operators.bash import BashOperator
+from airflow.utils.context import Context, context_merge
+from airflow.utils.operator_helpers import determine_kwargs
+from airflow.utils.types import NOTSET
+
+
+class _BashDecoratedOperator(DecoratedOperator, BashOperator):
+ """Wraps a Python callable and uses the callable return value as the Bash
command to be executed.
+
+ :param python_callable: A reference to an object that is callable.
+ :param op_kwargs: A dictionary of keyword arguments that will get unpacked
+ in your function (templated).
+ :param op_args: A list of positional arguments that will get unpacked when
+ calling your callable (templated).
+ """
+
+ template_fields: Sequence[str] = (*DecoratedOperator.template_fields,
*BashOperator.template_fields)
+ template_fields_renderers: dict[str, str] = {
+ **DecoratedOperator.template_fields_renderers,
+ **BashOperator.template_fields_renderers,
+ }
+
+ custom_operator_name: str = "@task.bash"
+
+ def __init__(
+ self,
+ *,
+ python_callable: Callable,
+ op_args: Collection[Any] | None = None,
+ op_kwargs: Mapping[str, Any] | None = None,
+ **kwargs,
+ ) -> None:
+ if kwargs.get("multiple_outputs") is not None:
+ warnings.warn(
+ f"`multiple_outputs` is not supported in
{self.custom_operator_name} tasks. Ignoring.",
+ stacklevel=1,
+ )
+ kwargs.pop("multiple_outputs")
+
+ super().__init__(
+ python_callable=python_callable,
+ op_args=op_args,
+ op_kwargs=op_kwargs,
+ bash_command=NOTSET,
+ **kwargs,
+ )
+
+ def execute(self, context: Context) -> Any:
+ context_merge(context, self.op_kwargs)
+ kwargs = determine_kwargs(self.python_callable, self.op_args, context)
+
+ self.bash_command = self.python_callable(*self.op_args, **kwargs)
+
+ if not isinstance(self.bash_command, str) or self.bash_command.strip()
== "":
+ raise TypeError("The returned value from the TaskFlow callable
must be a non-empty string.")
+
+ return super().execute(context)
+
+
+def bash_task(
+ python_callable: Callable | None = None,
+ **kwargs,
+) -> TaskDecorator:
+ """Wraps a function into a BashOperator.
+
+ Accepts kwargs for operator kwargs. Can be reused in a single DAG. This
function is only used only used
+ during type checking or auto-completion.
+
+ :param python_callable: Function to decorate.
+
+ :meta private:
+ """
+ return task_decorator_factory(
+ python_callable=python_callable,
+ decorated_operator_class=_BashDecoratedOperator,
+ **kwargs,
+ )
diff --git a/airflow/example_dags/example_bash_decorator.py
b/airflow/example_dags/example_bash_decorator.py
new file mode 100644
index 0000000000..ee8151f337
--- /dev/null
+++ b/airflow/example_dags/example_bash_decorator.py
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import pendulum
+
+from airflow.decorators import dag, task
+from airflow.exceptions import AirflowSkipException
+from airflow.models.baseoperator import chain
+from airflow.operators.empty import EmptyOperator
+from airflow.utils.trigger_rule import TriggerRule
+from airflow.utils.weekday import WeekDay
+
+
+@dag(schedule=None, start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False)
+def example_bash_decorator():
+ @task.bash
+ def run_me(sleep_seconds: int, task_instance_key_str: str) -> str:
+ return f"echo {task_instance_key_str} && sleep {sleep_seconds}"
+
+ run_me_loop = [run_me.override(task_id=f"runme_{i}")(sleep_seconds=i) for
i in range(3)]
+
+ # [START howto_decorator_bash]
+ @task.bash
+ def run_after_loop() -> str:
+ return "echo 1"
+
+ run_this = run_after_loop()
+ # [END howto_decorator_bash]
+
+ # [START howto_decorator_bash_template]
+ @task.bash
+ def also_run_this() -> str:
+ return 'echo "ti_key={{ task_instance_key_str }}"'
+
+ also_this = also_run_this()
+ # [END howto_decorator_bash_template]
+
+ # [START howto_decorator_bash_context_vars]
+ @task.bash
+ def also_run_this_again(task_instance_key_str) -> str:
+ return f'echo "ti_key={task_instance_key_str}"'
+
+ also_this_again = also_run_this_again()
+ # [END howto_decorator_bash_context_vars]
+
+ # [START howto_decorator_bash_skip]
+ @task.bash
+ def this_will_skip() -> str:
+ return 'echo "hello world"; exit 99;'
+
+ this_skips = this_will_skip()
+ # [END howto_decorator_bash_skip]
+
+ run_this_last = EmptyOperator(task_id="run_this_last",
trigger_rule=TriggerRule.ALL_DONE)
+
+ # [START howto_decorator_bash_conditional]
+ @task.bash
+ def sleep_in(day: str) -> str:
+ if day in (WeekDay.SATURDAY, WeekDay.SUNDAY):
+ return f"sleep {60 * 60}"
+ else:
+ raise AirflowSkipException("No sleeping in today!")
+
+ sleep_in(day="{{ dag_run.logical_date.strftime('%A').lower() }}")
+ # [END howto_decorator_bash_conditional]
+
+ # [START howto_decorator_bash_parametrize]
+ @task.bash(env={"BASE_DIR": "{{ dag_run.logical_date.strftime('%Y/%m/%d')
}}"}, append_env=True)
+ def make_dynamic_dirs(new_dirs: str) -> str:
+ return f"mkdir -p $AIRFLOW_HOME/$BASE_DIR/{new_dirs}"
+
+ make_dynamic_dirs(new_dirs="foo/bar/baz")
+ # [END howto_decorator_bash_parametrize]
+
+ # [START howto_decorator_bash_build_cmd]
+ def _get_files_in_cwd() -> list[str]:
+ from pathlib import Path
+
+ dir_contents = Path.cwd().glob("airflow/example_dags/*.py")
+ files = [str(elem) for elem in dir_contents if elem.is_file()]
+
+ return files
+
+ @task.bash
+ def get_file_stats() -> str:
+ files = _get_files_in_cwd()
+ cmd = "stat "
+ cmd += " ".join(files)
+
+ return cmd
+
+ get_file_stats()
+ # [END howto_decorator_bash_build_cmd]
+
+ chain(run_me_loop, run_this)
+ chain([also_this, also_this_again, this_skips, run_this], run_this_last)
+
+
+example_bash_decorator()
diff --git a/airflow/operators/bash.py b/airflow/operators/bash.py
index 08fdaff1cb..d913b2b282 100644
--- a/airflow/operators/bash.py
+++ b/airflow/operators/bash.py
@@ -21,14 +21,16 @@ import os
import shutil
import warnings
from functools import cached_property
-from typing import TYPE_CHECKING, Container, Sequence
+from typing import TYPE_CHECKING, Container, Sequence, cast
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.hooks.subprocess import SubprocessHook
from airflow.models.baseoperator import BaseOperator
from airflow.utils.operator_helpers import context_to_airflow_vars
+from airflow.utils.types import ArgNotSet
if TYPE_CHECKING:
+ from airflow.models.taskinstance import TaskInstance
from airflow.utils.context import Context
@@ -44,7 +46,7 @@ class BashOperator(BaseOperator):
will also be pushed to an XCom when the bash command completes
:param bash_command: The command, set of commands or reference to a
- bash script (must be '.sh') to be executed. (templated)
+ Bash script (must be '.sh' or '.bash') to be executed. (templated)
:param env: If env is not None, it must be a dict that defines the
environment variables for the new process; these are used instead
of inheriting the current process environment, which is the default
@@ -53,14 +55,14 @@ class BashOperator(BaseOperator):
and does not inherit the current process environment. If True,
inherits the environment variables
from current passes and then environment variable passed by the user
will either update the existing
inherited environment variables or the new variables gets appended to
it
- :param output_encoding: Output encoding of bash command
+ :param output_encoding: Output encoding of Bash command
:param skip_on_exit_code: If task exits with this exit code, leave the task
in ``skipped`` state (default: 99). If set to ``None``, any non-zero
exit code will be treated as a failure.
:param cwd: Working directory to execute the command in.
If None (default), the command is run in a temporary directory.
- Airflow will evaluate the exit code of the bash command. In general, a
non-zero exit code will result in
+ Airflow will evaluate the exit code of the Bash command. In general, a
non-zero exit code will result in
task failure and zero will result in task success.
Exit code ``99`` (or another set in ``skip_on_exit_code``)
will throw an :class:`airflow.exceptions.AirflowSkipException`, which will
leave the task in ``skipped``
@@ -85,7 +87,6 @@ class BashOperator(BaseOperator):
code. This can be an issue if the non-zero exit arises from a
sub-command. The easiest way of
addressing this is to prefix the command with ``set -e;``
- Example:
.. code-block:: python
bash_command = "set -e; python3 script.py '{{ next_execution_date
}}'"
@@ -131,16 +132,13 @@ class BashOperator(BaseOperator):
template_fields: Sequence[str] = ("bash_command", "env")
template_fields_renderers = {"bash_command": "bash", "env": "json"}
- template_ext: Sequence[str] = (
- ".sh",
- ".bash",
- )
+ template_ext: Sequence[str] = (".sh", ".bash")
ui_color = "#f0ede4"
def __init__(
self,
*,
- bash_command: str,
+ bash_command: str | ArgNotSet,
env: dict[str, str] | None = None,
append_env: bool = False,
output_encoding: str = "utf-8",
@@ -168,11 +166,32 @@ class BashOperator(BaseOperator):
self.cwd = cwd
self.append_env = append_env
+ # When using the @task.bash decorator, the Bash command is not known
until the underlying Python
+ # callable is executed and therefore set to NOTSET initially. This
flag is useful during execution to
+ # determine whether the bash_command value needs to re-rendered.
+ self._init_bash_command_not_set = isinstance(self.bash_command,
ArgNotSet)
+
@cached_property
def subprocess_hook(self):
"""Returns hook for running the bash command."""
return SubprocessHook()
+ @staticmethod
+ def refresh_bash_command(ti: TaskInstance) -> None:
+ """Rewrite the underlying rendered bash_command value for a task
instance in the metadatabase.
+
+ TaskInstance.get_rendered_template_fields() cannot be used because
this will retrieve the
+ RenderedTaskInstanceFields from the metadatabase which doesn't have
the runtime-evaluated bash_command
+ value.
+
+ :meta private:
+ """
+ from airflow.models.renderedtifields import RenderedTaskInstanceFields
+
+ rtif = RenderedTaskInstanceFields(ti) # Templates are rendered be
default here.
+ RenderedTaskInstanceFields.write(rtif)
+ RenderedTaskInstanceFields.delete_old_records(ti.task_id, ti.dag_id)
+
def get_env(self, context):
"""Build the set of environment variables to be exposed for the bash
command."""
system_env = os.environ.copy()
@@ -200,6 +219,16 @@ class BashOperator(BaseOperator):
if not os.path.isdir(self.cwd):
raise AirflowException(f"The cwd {self.cwd} must be a
directory")
env = self.get_env(context)
+
+ # Because the bash_command value is evaluated at runtime using the
@tash.bash decorator, the
+ # RenderedTaskInstanceField data needs to be rewritten and the
bash_command value re-rendered -- the
+ # latter because the returned command from the decorated callable
could contain a Jinja expression.
+ # Both will ensure the correct Bash command is executed and that the
Rendered Template view in the UI
+ # displays the executed command (otherwise it will display as an
ArgNotSet type).
+ if self._init_bash_command_not_set:
+ ti = cast("TaskInstance", context["ti"])
+ self.refresh_bash_command(ti)
+
result = self.subprocess_hook.run_command(
command=[bash_path, "-c", self.bash_command],
env=env,
@@ -212,6 +241,7 @@ class BashOperator(BaseOperator):
raise AirflowException(
f"Bash command failed. The command returned a non-zero exit
code {result.exit_code}."
)
+
return result.output
def on_kill(self) -> None:
diff --git a/docs/apache-airflow/howto/operator/bash.rst
b/docs/apache-airflow/howto/operator/bash.rst
index 10b669099f..50d7286fe1 100644
--- a/docs/apache-airflow/howto/operator/bash.rst
+++ b/docs/apache-airflow/howto/operator/bash.rst
@@ -23,108 +23,265 @@ BashOperator
============
Use the :class:`~airflow.operators.bash.BashOperator` to execute
-commands in a `Bash <https://www.gnu.org/software/bash/>`__ shell.
+commands in a `Bash <https://www.gnu.org/software/bash/>`__ shell. The Bash
command or script to execute is
+determined by:
+
+1. The ``bash_command`` argument when using ``BashOperator``, or
+
+2. If using the TaskFlow decorator, ``@task.bash``, a non-empty string value
returned from the decorated callable.
+
+
+.. tip::
+
+ The ``@task.bash`` decorator is recommended over the classic
``BashOperator`` to execute Bash commands.
+
+
+.. tab-set::
+
+ .. tab-item:: @task.bash
+ :sync: taskflow
+
+ .. exampleinclude::
/../../airflow/example_dags/example_bash_decorator.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_decorator_bash]
+ :end-before: [END howto_decorator_bash]
+
+ .. tab-item:: BashOperator
+ :sync: operator
+
+ .. exampleinclude::
/../../airflow/example_dags/example_bash_operator.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_bash]
+ :end-before: [END howto_operator_bash]
-.. exampleinclude:: /../../airflow/example_dags/example_bash_operator.py
- :language: python
- :dedent: 4
- :start-after: [START howto_operator_bash]
- :end-before: [END howto_operator_bash]
Templating
----------
-You can use :ref:`Jinja templates <concepts:jinja-templating>` to parameterize
the
-``bash_command`` argument.
+You can use :ref:`Jinja templates <concepts:jinja-templating>` to parameterize
the Bash command.
+
+.. tab-set::
+
+ .. tab-item:: @task.bash
+ :sync: taskflow
+
+ .. exampleinclude::
/../../airflow/example_dags/example_bash_decorator.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_decorator_bash_template]
+ :end-before: [END howto_decorator_bash_template]
+
+ .. tab-item:: BashOperator
+ :sync: operator
+
+ .. exampleinclude::
/../../airflow/example_dags/example_bash_operator.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_bash_template]
+ :end-before: [END howto_operator_bash_template]
+
+Using the ``@task.bash`` TaskFlow decorator allows you to return a formatted
string and take advantage of
+having all :ref:`execution context variables directly accessible to decorated
tasks <taskflow/accessing_context_variables>`.
-.. exampleinclude:: /../../airflow/example_dags/example_bash_operator.py
+.. exampleinclude:: /../../airflow/example_dags/example_bash_decorator.py
:language: python
:dedent: 4
- :start-after: [START howto_operator_bash_template]
- :end-before: [END howto_operator_bash_template]
+ :start-after: [START howto_decorator_bash_context_vars]
+ :end-before: [END howto_decorator_bash_context_vars]
+You are encouraged to take advantage of this approach as it fits nicely into
the overall TaskFlow paradigm.
-.. warning::
+.. caution::
- Care should be taken with "user" input or when using Jinja templates in the
- ``bash_command``, as this bash operator does not perform any escaping or
- sanitization of the command.
+ Care should be taken with "user" input when using Jinja templates in the
Bash command as escaping and
+ sanitization of the Bash command is not performed.
- This applies mostly to using "dag_run" conf, as that can be submitted via
- users in the Web UI. Most of the default template variables are not at
- risk.
+ This applies mostly to using ``dag_run.conf``, as that can be submitted
via users in the Web UI. Most of
+ the default template variables are not at risk.
-For example, do **not** do this:
+ For example, do **not** do:
-.. code-block:: python
+ .. tab-set::
- bash_task = BashOperator(
- task_id="bash_task",
- bash_command='echo "Here is the message: \'{{ dag_run.conf["message"]
if dag_run else "" }}\'"',
- )
+ .. tab-item:: @task.bash
+ :sync: taskflow
-Instead, you should pass this via the ``env`` kwarg and use double-quotes
-inside the bash_command, as below:
+ .. code-block:: python
-.. code-block:: python
+ @task.bash
+ def bash_task() -> str:
+ return 'echo "Here is the message: \'{{
dag_run.conf["message"] if dag_run.conf else "" }}\'"'
+
+
+ # Or directly accessing `dag_run.conf`
+ @task.bash
+ def bash_task(dag_run) -> str:
+ message = dag_run.conf["message"] if dag_run.conf else ""
+ return f'echo "here is the message: {message}"'
+
+ .. tab-item:: BashOperator
+ :sync: operator
+
+ .. code-block:: python
+
+ bash_task = BashOperator(
+ task_id="bash_task",
+ bash_command='echo "Here is the message: \'{{
dag_run.conf["message"] if dag_run.conf else "" }}\'"',
+ )
+
+
+ Instead, you should pass this via the ``env`` kwarg and use double-quotes
inside the Bash command.
+
+ .. tab-set::
+
+ .. tab-item:: @task.bash
+ :sync: taskflow
+
+ .. code-block:: python
+
+ @task.bash(env={"message": '{{ dag_run.conf["message"] if
dag_run.conf else "" }}'})
+ def bash_task() -> str:
+ return "echo \"here is the message: '$message'\""
+
+ .. tab-item:: BashOperator
+ :sync: operator
+
+ .. code-block:: python
+
+ bash_task = BashOperator(
+ task_id="bash_task",
+ bash_command="echo \"here is the message: '$message'\"",
+ env={"message": '{{ dag_run.conf["message"] if
dag_run.conf else "" }}'},
+ )
- bash_task = BashOperator(
- task_id="bash_task",
- bash_command="echo \"here is the message: '$message'\"",
- env={"message": '{{ dag_run.conf["message"] if dag_run else "" }}'},
- )
Skipping
--------
-In general a non-zero exit code produces an AirflowException and thus a task
failure. In cases where it is desirable
-to instead have the task end in a ``skipped`` state, you can exit with code
``99`` (or with another exit code if you
-pass ``skip_exit_code``).
+In general a non-zero exit code produces an AirflowException and thus a task
failure. In cases where it is
+desirable to instead have the task end in a ``skipped`` state, you can exit
with code ``99`` (or with another
+exit code if you pass ``skip_on_exit_code``).
-.. exampleinclude:: /../../airflow/example_dags/example_bash_operator.py
- :language: python
- :start-after: [START howto_operator_bash_skip]
- :end-before: [END howto_operator_bash_skip]
+.. tab-set::
+
+ .. tab-item:: @task.bash
+ :sync: taskflow
+
+ .. exampleinclude::
/../../airflow/example_dags/example_bash_decorator.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_decorator_bash_skip]
+ :end-before: [END howto_decorator_bash_skip]
+
+ .. tab-item:: BashOperator
+ :sync: operator
+
+ .. exampleinclude::
/../../airflow/example_dags/example_bash_operator.py
+ :language: python
+ :start-after: [START howto_operator_bash_skip]
+ :end-before: [END howto_operator_bash_skip]
-Troubleshooting
----------------
+Executing commands from files
+-----------------------------
+Both the ``BashOperator`` and ``@task.bash`` TaskFlow decorator enables you to
execute Bash commands stored
+in files. The files **must** have a ``.sh`` or ``.bash`` extension.
+
+Note the space after the script name (more on this in the next section).
+
+.. tab-set::
+
+ .. tab-item:: @task.bash
+ :sync: taskflow
+
+ .. code-block:: python
+ :emphasize-lines: 3
+
+ @task.bash
+ def run_command_from_script() -> str:
+ return "$AIRFLOW_HOME/scripts/example.sh "
+
+
+ run_script = run_command_from_script()
+
+ .. tab-item:: BashOperator
+ :sync: operator
+
+ .. code-block:: python
+ :emphasize-lines: 3
+
+ run_script = BashOperator(
+ task_id="run_command_from_script",
+ bash_command="$AIRFLOW_HOME/scripts/example.sh ",
+ )
+
Jinja template not found
""""""""""""""""""""""""
-Add a space after the script name when directly calling a Bash script with
-the ``bash_command`` argument. This is because Airflow tries to apply a Jinja
-template to it, which will fail.
+If you encounter a "Template not found" exception when trying to execute a
Bash script, add a space after the
+script name. This is because Airflow tries to apply a Jinja template to it,
which will fail.
+
+.. tab-set::
+
+ .. tab-item:: @task.bash
+ :sync: taskflow
+
+ .. code-block:: python
+
+ @task.bash
+ def bash_example():
+ # This fails with 'Jinja template not found' error
+ # return "/home/batcher/test.sh",
+ # This works (has a space after)
+ return "/home/batcher/test.sh "
-.. code-block:: python
+ .. tab-item:: BashOperator
+ :sync: operator
- t2 = BashOperator(
- task_id="bash_example",
- # This fails with 'Jinja template not found' error
- # bash_command="/home/batcher/test.sh",
- # This works (has a space after)
- bash_command="/home/batcher/test.sh ",
- dag=dag,
- )
+ .. code-block:: python
-However, if you want to use templating in your bash script, do not add the
space
-and instead put your bash script in a location relative to the directory
containing
+ BashOperator(
+ task_id="bash_example",
+ # This fails with 'Jinja template not found' error
+ # bash_command="/home/batcher/test.sh",
+ # This works (has a space after)
+ bash_command="/home/batcher/test.sh ",
+ )
+
+However, if you want to use templating in your Bash script, do not add the
space
+and instead put your Bash script in a location relative to the directory
containing
the DAG file. So if your DAG file is in
``/usr/local/airflow/dags/test_dag.py``, you can
move your ``test.sh`` file to any location under ``/usr/local/airflow/dags/``
(Example:
``/usr/local/airflow/dags/scripts/test.sh``) and pass the relative path to
``bash_command``
as shown below:
-.. code-block:: python
+.. tab-set::
+
+ .. tab-item:: @tash.bash
+ :sync: taskflow
+
+ .. code-block:: python
+
+ @task.bash
+ def bash_example():
+ # "scripts" folder is under "/usr/local/airflow/dags"
+ return "scripts/test.sh"
- t2 = BashOperator(
- task_id="bash_example",
- # "scripts" folder is under "/usr/local/airflow/dags"
- bash_command="scripts/test.sh",
- dag=dag,
- )
+ .. tab-item:: BashOperator
+ :sync: operator
-Creating separate folder for bash scripts may be desirable for many reasons,
like
+ .. code-block:: python
+
+ t2 = BashOperator(
+ task_id="bash_example",
+ # "scripts" folder is under "/usr/local/airflow/dags"
+ bash_command="scripts/test.sh",
+ )
+
+Creating separate folder for Bash scripts may be desirable for many reasons,
like
separating your script's logic and pipeline code, allowing for proper code
highlighting
in files composed in different languages, and general flexibility in
structuring
pipelines.
@@ -132,17 +289,58 @@ pipelines.
It is also possible to define your ``template_searchpath`` as pointing to any
folder
locations in the DAG constructor call.
-Example:
+.. tab-set::
+
+ .. tab-item:: @task.bash
+ :sync: taskflow
+
+ .. code-block:: python
+ :emphasize-lines: 1
+
+ @dag(..., template_searchpath="/opt/scripts")
+ def example_bash_dag():
+ @task.bash
+ def bash_example():
+ return "test.sh "
+
+ .. tab-item:: BashOperator
+ :sync: operator
+
+ .. code-block:: python
+ :emphasize-lines: 1
+
+ with DAG("example_bash_dag", ...,
template_searchpath="/opt/scripts"):
+ t2 = BashOperator(
+ task_id="bash_example",
+ bash_command="test.sh ",
+ )
+
+Enriching Bash with Python
+--------------------------
-.. code-block:: python
+The ``@task.bash`` TaskFlow decorator allows you to combine both Bash and
Python into a powerful combination
+within a task.
+
+Using Python conditionals, other function calls, etc. within a ``@task.bash``
task can help define, augment,
+or even build the Bash command(s) to execute.
+
+For example, use conditional logic to determine task behavior:
+
+.. exampleinclude:: /../../airflow/example_dags/example_bash_decorator.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_decorator_bash_conditional]
+ :end-before: [END howto_decorator_bash_conditional]
+
+Or call a function to help build a Bash command:
+
+.. exampleinclude:: /../../airflow/example_dags/example_bash_decorator.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_decorator_bash_build_cmd]
+ :end-before: [END howto_decorator_bash_build_cmd]
- dag = DAG("example_bash_dag", template_searchpath="/opt/scripts")
- t2 = BashOperator(
- task_id="bash_example",
- # "test.sh" is a file under "/opt/scripts"
- bash_command="test.sh ",
- dag=dag,
- )
+There are numerous possibilities with this type of pre-execution enrichment.
.. _howto/operator:BashSensor:
diff --git a/docs/apache-airflow/tutorial/taskflow.rst
b/docs/apache-airflow/tutorial/taskflow.rst
index 08967bdde3..5d71576b59 100644
--- a/docs/apache-airflow/tutorial/taskflow.rst
+++ b/docs/apache-airflow/tutorial/taskflow.rst
@@ -576,6 +576,8 @@ task to copy the same file to a date-partitioned storage
location in S3 for long
dest_bucket_key=f"""{BASE_PATH}/{"{{
execution_date.strftime('%Y/%m/%d') }}"}/{FILE_NAME}""",
)
+.. _taskflow/accessing_context_variables:
+
Accessing context variables in decorated tasks
----------------------------------------------
diff --git a/tests/decorators/test_bash.py b/tests/decorators/test_bash.py
new file mode 100644
index 0000000000..850f46b0e4
--- /dev/null
+++ b/tests/decorators/test_bash.py
@@ -0,0 +1,453 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+import stat
+from contextlib import nullcontext as no_raise
+from unittest import mock
+
+import pytest
+
+from airflow.decorators import task
+from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.models.renderedtifields import RenderedTaskInstanceFields
+from airflow.utils import timezone
+from airflow.utils.types import NOTSET
+from tests.test_utils.db import clear_db_dags, clear_db_runs,
clear_rendered_ti_fields
+
+DEFAULT_DATE = timezone.datetime(2023, 1, 1)
+
+
[email protected]_test
+class TestBashDecorator:
+ @pytest.fixture(scope="function", autouse=True)
+ def setup(self, dag_maker):
+ self.dag_maker = dag_maker
+
+ with dag_maker(dag_id="bash_deco_dag") as dag:
+ ...
+
+ self.dag = dag
+
+ def teardown_method(self):
+ clear_db_runs()
+ clear_db_dags()
+ clear_rendered_ti_fields()
+
+ def execute_task(self, task):
+ session = self.dag_maker.session
+ dag_run = self.dag_maker.create_dagrun(
+ run_id=f"bash_deco_test_{DEFAULT_DATE.date()}", session=session
+ )
+ ti = dag_run.get_task_instance(task.operator.task_id, session=session)
+ return_val = task.operator.execute(context={"ti": ti})
+
+ return ti, return_val
+
+ @staticmethod
+ def validate_bash_command_rtif(ti, expected_command):
+ assert
RenderedTaskInstanceFields.get_templated_fields(ti)["bash_command"] ==
expected_command
+
+ def test_bash_decorator_init(self):
+ """Test the initialization of the @task.bash decorator."""
+
+ with self.dag:
+
+ @task.bash
+ def bash():
+ ...
+
+ bash_task = bash()
+
+ assert bash_task.operator.task_id == "bash"
+ assert bash_task.operator.bash_command == NOTSET
+ assert bash_task.operator.env is None
+ assert bash_task.operator.append_env is False
+ assert bash_task.operator.output_encoding == "utf-8"
+ assert bash_task.operator.skip_on_exit_code == [99]
+ assert bash_task.operator.cwd is None
+ assert bash_task.operator._init_bash_command_not_set is True
+
+ @pytest.mark.parametrize(
+ argnames=["command", "expected_command", "expected_return_val"],
+ argvalues=[
+ pytest.param("echo hello world", "echo hello world", "hello
world", id="not_templated"),
+ pytest.param(
+ "echo {{ ds }}", f"echo {DEFAULT_DATE.date()}",
str(DEFAULT_DATE.date()), id="templated"
+ ),
+ ],
+ )
+ def test_bash_command(self, command, expected_command,
expected_return_val):
+ """Test the runtime bash_command is the function's return string,
rendered if needed."""
+
+ with self.dag:
+
+ @task.bash
+ def bash():
+ return command
+
+ bash_task = bash()
+
+ assert bash_task.operator.bash_command == NOTSET
+
+ ti, return_val = self.execute_task(bash_task)
+
+ assert bash_task.operator.bash_command == expected_command
+ assert return_val == expected_return_val
+
+ self.validate_bash_command_rtif(ti, expected_command)
+
+ def test_op_args_kwargs(self):
+ """Test op_args and op_kwargs are passed to the bash_command."""
+
+ with self.dag:
+
+ @task.bash
+ def bash(id, other_id):
+ return f"echo hello {id} && echo {other_id}"
+
+ bash_task = bash("world", other_id="2")
+
+ assert bash_task.operator.bash_command == NOTSET
+
+ ti, return_val = self.execute_task(bash_task)
+
+ assert bash_task.operator.bash_command == "echo hello world && echo 2"
+ assert return_val == "2"
+
+ self.validate_bash_command_rtif(ti, "echo hello world && echo 2")
+
+ def test_multiline_command(self):
+ """Verify a multi-line string can be used as a Bash command."""
+ command = """
+ echo {foo} |
+ rev
+ """
+ excepted_command = command.format(foo="foo")
+
+ with self.dag:
+
+ @task.bash
+ def bash(foo):
+ return command.format(foo=foo)
+
+ bash_task = bash("foo")
+
+ assert bash_task.operator.bash_command == NOTSET
+
+ ti, return_val = self.execute_task(bash_task)
+
+ assert return_val == "oof"
+ assert bash_task.operator.bash_command == excepted_command
+
+ self.validate_bash_command_rtif(ti, excepted_command)
+
+ @pytest.mark.parametrize(
+ argnames=["append_env", "user_defined_env", "expected_airflow_home"],
+ argvalues=[
+ pytest.param(False, {"var": "value"}, "", id="no_append_env"),
+ pytest.param(True, {"var": "value"}, "path/to/airflow/home",
id="append_env"),
+ ],
+ )
+ def test_env_variables(self, append_env, user_defined_env,
expected_airflow_home, caplog):
+ """Test env variables exist appropriately depending on if the existing
env variables are allowed."""
+ with self.dag:
+
+ @task.bash(env=user_defined_env, append_env=append_env)
+ def bash():
+ return "echo var=$var; echo AIRFLOW_HOME=$AIRFLOW_HOME;"
+
+ bash_task = bash()
+
+ assert bash_task.operator.bash_command == NOTSET
+
+ with mock.patch.dict("os.environ", {"AIRFLOW_HOME":
"path/to/airflow/home"}):
+ ti, return_val = self.execute_task(bash_task)
+
+ assert bash_task.operator.env == user_defined_env
+ assert "var=value" in caplog.text
+ assert f"AIRFLOW_HOME={expected_airflow_home}" in caplog.text
+
+ self.validate_bash_command_rtif(ti, "echo var=$var; echo
AIRFLOW_HOME=$AIRFLOW_HOME;")
+
+ @pytest.mark.parametrize(
+ argnames=["exit_code", "expected"],
+ argvalues=[
+ pytest.param(99, pytest.raises(AirflowSkipException), id="skip"),
+ pytest.param(1, pytest.raises(AirflowException), id="non-zero"),
+ pytest.param(0, no_raise(), id="zero"),
+ ],
+ )
+ def test_exit_code_behavior(self, exit_code, expected):
+ """Test @task.bash tasks behave appropriately relative the exit code
from the bash_command."""
+
+ with self.dag:
+
+ @task.bash
+ def bash(code):
+ return f"exit {code}"
+
+ bash_task = bash(exit_code)
+
+ assert bash_task.operator.bash_command == NOTSET
+
+ with expected:
+ ti, return_val = self.execute_task(bash_task)
+
+ self.validate_bash_command_rtif(ti, f"exit {exit_code}")
+
+ @pytest.mark.parametrize(
+ argnames=["skip_on_exit_code", "exit_code", "expected"],
+ argvalues=[
+ pytest.param(None, 99, pytest.raises(AirflowSkipException),
id="default_skip_exit_99"),
+ pytest.param(None, 1, pytest.raises(AirflowException),
id="default_skip_exit_1"),
+ pytest.param(None, 0, no_raise(), id="default_skip_exit_0"),
+ pytest.param({"skip_on_exit_code": 86}, 0, no_raise(),
id="skip_86_exit_0"),
+ pytest.param(
+ {"skip_on_exit_code": 100}, 42,
pytest.raises(AirflowException), id="skip_100_exit_42"
+ ),
+ pytest.param(
+ {"skip_on_exit_code": 100}, 100,
pytest.raises(AirflowSkipException), id="skip_100_exit_100"
+ ),
+ pytest.param(
+ {"skip_on_exit_code": [100, 101]},
+ 100,
+ pytest.raises(AirflowSkipException),
+ id="skip_100-101_exit_100",
+ ),
+ pytest.param(
+ {"skip_on_exit_code": [100, 101]},
+ 102,
+ pytest.raises(AirflowException),
+ id="skip_100-101_exit_102",
+ ),
+ ],
+ )
+ def test_skip_on_exit_code_behavior(self, skip_on_exit_code, exit_code,
expected):
+ """Ensure tasks behave appropriately relative to defined skip exit
code from the bash_command."""
+
+ with self.dag:
+
+ @task.bash(**skip_on_exit_code if skip_on_exit_code else {})
+ def bash(code):
+ return f"exit {code}"
+
+ bash_task = bash(exit_code)
+
+ assert bash_task.operator.bash_command == NOTSET
+
+ with expected:
+ ti, return_val = self.execute_task(bash_task)
+
+ self.validate_bash_command_rtif(ti, f"exit {exit_code}")
+
+ @pytest.mark.parametrize(
+ argnames=[
+ "user_defined_env",
+ "append_env",
+ "expected_razz",
+ "expected_airflow_home",
+ ],
+ argvalues=[
+ pytest.param(
+ {"razz": "matazz"}, True, "matazz", "path/to/airflow/home",
id="user_defined_env_and_append"
+ ),
+ pytest.param({"razz": "matazz"}, False, "matazz", "",
id="user_defined_env_no_append"),
+ pytest.param({}, True, "", "path/to/airflow/home",
id="no_user_defined_env_and_append"),
+ pytest.param({}, False, "", "",
id="no_user_defined_env_no_append"),
+ ],
+ )
+ def test_env_variables_in_bash_command_file(
+ self,
+ user_defined_env,
+ append_env,
+ expected_razz,
+ expected_airflow_home,
+ tmp_path,
+ caplog,
+ ):
+ """Test the behavior of user-defined env vars when using an external
file with a Bash command."""
+ cmd_file = tmp_path / "test_file.sh"
+ cmd_file.write_text("#!/usr/bin/env bash\necho
AIRFLOW_HOME=$AIRFLOW_HOME\necho razz=$razz\n")
+ cmd_file.chmod(stat.S_IEXEC)
+
+ with self.dag:
+
+ @task.bash(env=user_defined_env, append_env=append_env)
+ def bash(command_file_name):
+ return command_file_name
+
+ with mock.patch.dict("os.environ", {"AIRFLOW_HOME":
"path/to/airflow/home"}):
+ bash_task = bash(f"{cmd_file} ")
+
+ assert bash_task.operator.bash_command == NOTSET
+
+ ti, return_val = self.execute_task(bash_task)
+
+ assert f"razz={expected_razz}" in caplog.text
+ assert f"AIRFLOW_HOME={expected_airflow_home}" in caplog.text
+ assert return_val == f"razz={expected_razz}"
+ self.validate_bash_command_rtif(ti, f"{cmd_file} ")
+
+ def test_valid_cwd(self, tmp_path):
+ """Test a user-defined working directory can be used."""
+ cwd_path = tmp_path / "test_cwd"
+ cwd_path.mkdir()
+
+ with self.dag:
+
+ @task.bash(cwd=os.fspath(cwd_path))
+ def bash():
+ return "echo foo | tee output.txt"
+
+ bash_task = bash()
+
+ assert bash_task.operator.bash_command == NOTSET
+
+ ti, return_val = self.execute_task(bash_task)
+
+ assert return_val == "foo"
+ assert (cwd_path / "output.txt").read_text().splitlines()[0] == "foo"
+ self.validate_bash_command_rtif(ti, "echo foo | tee output.txt")
+
+ def test_cwd_does_not_exist(self, tmp_path):
+ """Verify task failure for non-existent, user-defined working
directory."""
+ cwd_path = tmp_path / "test_cwd"
+
+ with self.dag:
+
+ @task.bash(cwd=os.fspath(cwd_path))
+ def bash():
+ return "echo"
+
+ bash_task = bash()
+
+ assert bash_task.operator.bash_command == NOTSET
+
+ with pytest.raises(AirflowException, match=f"Can not find the cwd:
{cwd_path}"):
+ ti, _ = self.execute_task(bash_task)
+
+ self.validate_bash_command_rtif(ti, "echo")
+
+ def test_cwd_is_file(self, tmp_path):
+ """Verify task failure for user-defined working directory that is
actually a file."""
+ cwd_file = tmp_path / "test_file.sh"
+ cwd_file.touch()
+
+ with self.dag:
+
+ @task.bash(cwd=os.fspath(cwd_file))
+ def bash():
+ return "echo"
+
+ bash_task = bash()
+
+ assert bash_task.operator.bash_command == NOTSET
+
+ with pytest.raises(AirflowException, match=f"The cwd {cwd_file} must
be a directory"):
+ ti, _ = self.execute_task(bash_task)
+
+ self.validate_bash_command_rtif(ti, "echo")
+
+ def test_command_not_found(self):
+ """Fail task if executed command is not found on path."""
+ with pytest.raises(
+ AirflowException, match="Bash command failed\\. The command
returned a non-zero exit code 127\\."
+ ):
+ with self.dag:
+
+ @task.bash
+ def bash():
+ return "set -e; something-that-isnt-on-path"
+
+ bash_task = bash()
+
+ assert bash_task.operator.bash_command == NOTSET
+
+ ti, _ = self.execute_task(bash_task)
+ self.validate_bash_command_rtif(ti, "set -e;
something-that-isnt-on-path")
+
+ @pytest.mark.parametrize(argnames="multiple_outputs", argvalues=[True,
False])
+ def test_multiple_outputs(self, multiple_outputs):
+ """Verify setting `multiple_outputs` for a @task.bash-decorated
function is ignored."""
+
+ with self.dag:
+
+ @task.bash(multiple_outputs=multiple_outputs)
+ def bash():
+ return "echo"
+
+ with pytest.warns(
+ UserWarning, match="`multiple_outputs` is not supported in
@task.bash tasks. Ignoring."
+ ):
+ bash_task = bash()
+
+ assert bash_task.operator.bash_command == NOTSET
+
+ ti, _ = self.execute_task(bash_task)
+
+ assert bash_task.operator.multiple_outputs is False
+ self.validate_bash_command_rtif(ti, "echo")
+
+ @pytest.mark.parametrize(
+ argnames=["return_val", "expected"],
+ argvalues=[
+ pytest.param(None, pytest.raises(TypeError),
id="return_none_typeerror"),
+ pytest.param(1, pytest.raises(TypeError),
id="return_int_typeerror"),
+ pytest.param(NOTSET, pytest.raises(TypeError),
id="return_notset_typeerror"),
+ pytest.param(True, pytest.raises(TypeError),
id="return_boolean_typeerror"),
+ pytest.param("", pytest.raises(TypeError),
id="return_empty_string_typerror"),
+ pytest.param(" ", pytest.raises(TypeError),
id="return_spaces_string_typerror"),
+ pytest.param(["echo;", "exit 99;"], pytest.raises(TypeError),
id="return_list_typerror"),
+ pytest.param("echo", no_raise(), id="return_string_no_error"),
+ ],
+ )
+ def test_callable_return_is_string(self, return_val, expected):
+ """Ensure the returned value from the decorated callable is a
non-empty string."""
+
+ with self.dag:
+
+ @task.bash
+ def bash():
+ return return_val
+
+ bash_task = bash()
+
+ assert bash_task.operator.bash_command == NOTSET
+
+ with expected:
+ ti, _ = self.execute_task(bash_task)
+
+ self.validate_bash_command_rtif(ti, return_val)
+
+ def test_rtif_updates_upon_failure(self):
+ """Veriy RenderedTaskInstanceField data should contain the rendered
command even if the task fails."""
+ with self.dag:
+
+ @task.bash
+ def bash():
+ return "{{ ds }}; exit 1;"
+
+ bash_task = bash()
+
+ assert bash_task.operator.bash_command == NOTSET
+
+ with pytest.raises(AirflowException):
+ ti, _ = self.execute_task(bash_task)
+
+ self.validate_bash_command_rtif(ti, f"{DEFAULT_DATE.date()}; exit
1;")
diff --git a/tests/operators/test_bash.py b/tests/operators/test_bash.py
index 351c6595bf..fb2e4146bd 100644
--- a/tests/operators/test_bash.py
+++ b/tests/operators/test_bash.py
@@ -37,7 +37,24 @@ END_DATE = datetime(2016, 1, 2, tzinfo=timezone.utc)
INTERVAL = timedelta(hours=12)
[email protected]()
+def context():
+ yield {"ti": mock.Mock()}
+
+
class TestBashOperator:
+ def test_bash_operator_init(self):
+ """Test the construction of the operator with its defaults and
initially-derived attrs."""
+ op = BashOperator(task_id="bash_op", bash_command="echo")
+
+ assert op.bash_command == "echo"
+ assert op.env is None
+ assert op.append_env is False
+ assert op.output_encoding == "utf-8"
+ assert op.skip_on_exit_code == [99]
+ assert op.cwd is None
+ assert op._init_bash_command_not_set is False
+
@pytest.mark.db_test
@pytest.mark.parametrize(
"append_env,user_defined_env,expected_airflow_home",
@@ -105,17 +122,17 @@ class TestBashOperator:
("", ""),
],
)
- def test_return_value(self, val, expected):
+ def test_return_value(self, val, expected, context):
op = BashOperator(task_id="abc", bash_command=f'set -e; echo "{val}";')
- line = op.execute({})
+ line = op.execute(context)
assert line == expected
- def test_raise_exception_on_non_zero_exit_code(self):
+ def test_raise_exception_on_non_zero_exit_code(self, context):
bash_operator = BashOperator(bash_command="exit 42",
task_id="test_return_value", dag=None)
with pytest.raises(
AirflowException, match="Bash command failed\\. The command
returned a non-zero exit code 42\\."
):
- bash_operator.execute(context={})
+ bash_operator.execute(context)
def test_task_retries(self):
bash_operator = BashOperator(
@@ -129,25 +146,25 @@ class TestBashOperator:
assert bash_operator.retries == 0
- def test_command_not_found(self):
+ def test_command_not_found(self, context):
with pytest.raises(
AirflowException, match="Bash command failed\\. The command
returned a non-zero exit code 127\\."
):
- BashOperator(task_id="abc", bash_command="set -e;
something-that-isnt-on-path").execute({})
+ BashOperator(task_id="abc", bash_command="set -e;
something-that-isnt-on-path").execute(context)
- def test_unset_cwd(self):
+ def test_unset_cwd(self, context):
val = "xxxx"
op = BashOperator(task_id="abc", bash_command=f'set -e; echo "{val}";')
- line = op.execute({})
+ line = op.execute(context)
assert line == val
- def test_cwd_does_not_exist(self, tmp_path):
+ def test_cwd_does_not_exist(self, context, tmp_path):
test_cmd = 'set -e; echo "xxxx" |tee outputs.txt'
test_cwd_folder = os.fspath(tmp_path / "test_command_with_cwd")
# There should be no exceptions when creating the operator even the
`cwd` doesn't exist
bash_operator = BashOperator(task_id="abc", bash_command=test_cmd,
cwd=os.fspath(test_cwd_folder))
with pytest.raises(AirflowException, match=f"Can not find the cwd:
{test_cwd_folder}"):
- bash_operator.execute({})
+ bash_operator.execute(context)
def test_cwd_is_file(self, tmp_path):
test_cmd = 'set -e; echo "xxxx" |tee outputs.txt'
@@ -157,12 +174,14 @@ class TestBashOperator:
with pytest.raises(AirflowException, match=f"The cwd {tmp_file} must
be a directory"):
BashOperator(task_id="abc", bash_command=test_cmd,
cwd=os.fspath(tmp_file)).execute({})
- def test_valid_cwd(self, tmp_path):
+ def test_valid_cwd(self, context, tmp_path):
test_cmd = 'set -e; echo "xxxx" |tee outputs.txt'
test_cwd_path = tmp_path / "test_command_with_cwd"
test_cwd_path.mkdir()
# Test everything went alright
- result = BashOperator(task_id="abc", bash_command=test_cmd,
cwd=os.fspath(test_cwd_path)).execute({})
+ result = BashOperator(task_id="abc", bash_command=test_cmd,
cwd=os.fspath(test_cwd_path)).execute(
+ context
+ )
assert result == "xxxx"
assert (test_cwd_path / "outputs.txt").read_text().splitlines()[0] ==
"xxxx"
@@ -180,23 +199,23 @@ class TestBashOperator:
({"skip_on_exit_code": None}, 0, None),
],
)
- def test_skip(self, extra_kwargs, actual_exit_code, expected_exc):
+ def test_skip(self, extra_kwargs, actual_exit_code, expected_exc, context):
kwargs = dict(task_id="abc", bash_command=f'set -e; echo "hello
world"; exit {actual_exit_code};')
if extra_kwargs:
kwargs.update(**extra_kwargs)
if expected_exc is None:
- BashOperator(**kwargs).execute({})
+ BashOperator(**kwargs).execute(context)
else:
with pytest.raises(expected_exc):
- BashOperator(**kwargs).execute({})
+ BashOperator(**kwargs).execute(context)
- def test_bash_operator_multi_byte_output(self):
+ def test_bash_operator_multi_byte_output(self, context):
op = BashOperator(
task_id="test_multi_byte_bash_operator",
bash_command="echo \u2600",
output_encoding="utf-8",
)
- op.execute(context={})
+ op.execute(context)
@pytest.mark.db_test
def test_bash_operator_kill(self, dag_maker):