This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cd69f8efc0 Added `output_processor` parameter to `BashProcessor`
(#40843)
cd69f8efc0 is described below
commit cd69f8efc0e26507518757ed4a357cba74cc8a13
Author: David Blain <[email protected]>
AuthorDate: Mon Aug 5 12:06:23 2024 +0200
Added `output_processor` parameter to `BashProcessor` (#40843)
---------
Co-authored-by: David Blain <[email protected]>
Co-authored-by: Shahar Epstein <[email protected]>
---
airflow/operators/bash.py | 13 ++++++---
docs/apache-airflow/howto/operator/bash.rst | 42 +++++++++++++++++++++++++++++
tests/operators/test_bash.py | 11 ++++++++
3 files changed, 63 insertions(+), 3 deletions(-)
diff --git a/airflow/operators/bash.py b/airflow/operators/bash.py
index 18ea9eb9d4..2ec0341a0d 100644
--- a/airflow/operators/bash.py
+++ b/airflow/operators/bash.py
@@ -21,7 +21,7 @@ import os
import shutil
import warnings
from functools import cached_property
-from typing import TYPE_CHECKING, Container, Sequence, cast
+from typing import TYPE_CHECKING, Any, Callable, Container, Sequence, cast
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.hooks.subprocess import SubprocessHook
@@ -63,6 +63,8 @@ class BashOperator(BaseOperator):
If None (default), the command is run in a temporary directory.
To use current DAG folder as the working directory,
you might set template ``{{ dag_run.dag.folder }}``.
+ :param output_processor: Function to further process the output of the
bash script
+ (default is lambda output: output).
Airflow will evaluate the exit code of the Bash command. In general, a
non-zero exit code will result in
task failure and zero will result in task success.
@@ -130,6 +132,9 @@ class BashOperator(BaseOperator):
env={"message": '{{ dag_run.conf["message"] if dag_run else ""
}}'},
)
+ .. versionadded:: 2.10.0
+ The `output_processor` parameter.
+
"""
template_fields: Sequence[str] = ("bash_command", "env", "cwd")
@@ -147,6 +152,7 @@ class BashOperator(BaseOperator):
skip_exit_code: int | None = None,
skip_on_exit_code: int | Container[int] | None = 99,
cwd: str | None = None,
+ output_processor: Callable[[str], Any] = lambda result: result,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -167,6 +173,7 @@ class BashOperator(BaseOperator):
)
self.cwd = cwd
self.append_env = append_env
+ self.output_processor = output_processor
# When using the @task.bash decorator, the Bash command is not known
until the underlying Python
# callable is executed and therefore set to NOTSET initially. This
flag is useful during execution to
@@ -221,7 +228,7 @@ class BashOperator(BaseOperator):
raise AirflowException(f"The cwd {self.cwd} must be a
directory")
env = self.get_env(context)
- # Because the bash_command value is evaluated at runtime using the
@tash.bash decorator, the
+ # Because the bash_command value is evaluated at runtime using the
@task.bash decorator, the
# RenderedTaskInstanceField data needs to be rewritten and the
bash_command value re-rendered -- the
# latter because the returned command from the decorated callable
could contain a Jinja expression.
# Both will ensure the correct Bash command is executed and that the
Rendered Template view in the UI
@@ -243,7 +250,7 @@ class BashOperator(BaseOperator):
f"Bash command failed. The command returned a non-zero exit
code {result.exit_code}."
)
- return result.output
+ return self.output_processor(result.output)
def on_kill(self) -> None:
self.subprocess_hook.send_sigterm()
diff --git a/docs/apache-airflow/howto/operator/bash.rst
b/docs/apache-airflow/howto/operator/bash.rst
index 50d7286fe1..daf430fa14 100644
--- a/docs/apache-airflow/howto/operator/bash.rst
+++ b/docs/apache-airflow/howto/operator/bash.rst
@@ -184,6 +184,48 @@ exit code if you pass ``skip_on_exit_code``).
:end-before: [END howto_operator_bash_skip]
+Output processor
+----------------
+
+The ``output_processor`` parameter allows you to specify a lambda function
that processes the output of the bash script
+before it is pushed as an XCom. This feature is particularly useful for
manipulating the script's output directly within
+the BashOperator, without the need for additional operators or tasks.
+
+For example, consider a scenario where the output of the bash script is a JSON
string. With the ``output_processor``,
+you can transform this string into a JSON object before storing it in XCom.
This simplifies the workflow and ensures
+that downstream tasks receive the processed data in the desired format.
+
+Here's how you can use the result_processor with the BashOperator:
+
+.. tab-set::
+
+ .. tab-item:: @task.bash
+ :sync: taskflow
+
+ .. code-block:: python
+
+ @task.bash(output_processor=lambda output: json.loads(output))
+ def bash_task() -> str:
+ return """
+ jq -c '.[] | select(.lastModified > "{{
data_interval_start | ts_zulu }}" or .created > "{{ data_interval_start |
ts_zulu }}")' \\
+ example.json
+ """
+
+ .. tab-item:: BashOperator
+ :sync: operator
+
+ .. code-block:: python
+
+ bash_task = BashOperator(
+ task_id="filter_today_changes",
+ bash_command="""
+ jq -c '.[] | select(.lastModified > "{{
data_interval_start | ts_zulu }}" or .created > "{{ data_interval_start |
ts_zulu }}")' \\
+ example.json
+ """,
+ output_processor=lambda output: json.loads(output),
+ )
+
+
Executing commands from files
-----------------------------
Both the ``BashOperator`` and ``@task.bash`` TaskFlow decorator enables you to
execute Bash commands stored
diff --git a/tests/operators/test_bash.py b/tests/operators/test_bash.py
index f3e9480c71..4ef4973f08 100644
--- a/tests/operators/test_bash.py
+++ b/tests/operators/test_bash.py
@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations
+import json
import os
import signal
from datetime import datetime, timedelta
@@ -228,6 +229,16 @@ class TestBashOperator:
)
op.execute(context)
+ def test_bash_operator_output_processor(self, context):
+ json_string = '{"AAD_BASIC": "Azure Active Directory Basic"}'
+ op = BashOperator(
+ task_id="test_bash_operator_output_processor",
+ bash_command=f"echo '{json_string}'",
+ output_processor=lambda output: json.loads(output),
+ )
+ result = op.execute(context)
+ assert result == json.loads(json_string)
+
@pytest.mark.db_test
def test_bash_operator_kill(self, dag_maker):
import psutil