This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cd69f8efc0 Added `output_processor` parameter to `BashProcessor` 
(#40843)
cd69f8efc0 is described below

commit cd69f8efc0e26507518757ed4a357cba74cc8a13
Author: David Blain <[email protected]>
AuthorDate: Mon Aug 5 12:06:23 2024 +0200

    Added `output_processor` parameter to `BashProcessor` (#40843)
    
    
    
    ---------
    
    Co-authored-by: David Blain <[email protected]>
    Co-authored-by: Shahar Epstein <[email protected]>
---
 airflow/operators/bash.py                   | 13 ++++++---
 docs/apache-airflow/howto/operator/bash.rst | 42 +++++++++++++++++++++++++++++
 tests/operators/test_bash.py                | 11 ++++++++
 3 files changed, 63 insertions(+), 3 deletions(-)

diff --git a/airflow/operators/bash.py b/airflow/operators/bash.py
index 18ea9eb9d4..2ec0341a0d 100644
--- a/airflow/operators/bash.py
+++ b/airflow/operators/bash.py
@@ -21,7 +21,7 @@ import os
 import shutil
 import warnings
 from functools import cached_property
-from typing import TYPE_CHECKING, Container, Sequence, cast
+from typing import TYPE_CHECKING, Any, Callable, Container, Sequence, cast
 
 from airflow.exceptions import AirflowException, AirflowSkipException
 from airflow.hooks.subprocess import SubprocessHook
@@ -63,6 +63,8 @@ class BashOperator(BaseOperator):
         If None (default), the command is run in a temporary directory.
         To use current DAG folder as the working directory,
         you might set template ``{{ dag_run.dag.folder }}``.
+    :param output_processor: Function to further process the output of the 
bash script
+        (default is lambda output: output).
 
     Airflow will evaluate the exit code of the Bash command. In general, a 
non-zero exit code will result in
     task failure and zero will result in task success.
@@ -130,6 +132,9 @@ class BashOperator(BaseOperator):
             env={"message": '{{ dag_run.conf["message"] if dag_run else "" 
}}'},
         )
 
+    .. versionadded:: 2.10.0
+       The `output_processor` parameter.
+
     """
 
     template_fields: Sequence[str] = ("bash_command", "env", "cwd")
@@ -147,6 +152,7 @@ class BashOperator(BaseOperator):
         skip_exit_code: int | None = None,
         skip_on_exit_code: int | Container[int] | None = 99,
         cwd: str | None = None,
+        output_processor: Callable[[str], Any] = lambda result: result,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -167,6 +173,7 @@ class BashOperator(BaseOperator):
         )
         self.cwd = cwd
         self.append_env = append_env
+        self.output_processor = output_processor
 
         # When using the @task.bash decorator, the Bash command is not known 
until the underlying Python
         # callable is executed and therefore set to NOTSET initially. This 
flag is useful during execution to
@@ -221,7 +228,7 @@ class BashOperator(BaseOperator):
                 raise AirflowException(f"The cwd {self.cwd} must be a 
directory")
         env = self.get_env(context)
 
-        # Because the bash_command value is evaluated at runtime using the 
@tash.bash decorator, the
+        # Because the bash_command value is evaluated at runtime using the 
@task.bash decorator, the
         # RenderedTaskInstanceField data needs to be rewritten and the 
bash_command value re-rendered -- the
         # latter because the returned command from the decorated callable 
could contain a Jinja expression.
         # Both will ensure the correct Bash command is executed and that the 
Rendered Template view in the UI
@@ -243,7 +250,7 @@ class BashOperator(BaseOperator):
                 f"Bash command failed. The command returned a non-zero exit 
code {result.exit_code}."
             )
 
-        return result.output
+        return self.output_processor(result.output)
 
     def on_kill(self) -> None:
         self.subprocess_hook.send_sigterm()
diff --git a/docs/apache-airflow/howto/operator/bash.rst 
b/docs/apache-airflow/howto/operator/bash.rst
index 50d7286fe1..daf430fa14 100644
--- a/docs/apache-airflow/howto/operator/bash.rst
+++ b/docs/apache-airflow/howto/operator/bash.rst
@@ -184,6 +184,48 @@ exit code if you pass ``skip_on_exit_code``).
             :end-before: [END howto_operator_bash_skip]
 
 
+Output processor
+----------------
+
+The ``output_processor`` parameter allows you to specify a lambda function 
that processes the output of the bash script
+before it is pushed as an XCom. This feature is particularly useful for 
manipulating the script's output directly within
+the BashOperator, without the need for additional operators or tasks.
+
+For example, consider a scenario where the output of the bash script is a JSON 
string. With the ``output_processor``,
+you can transform this string into a JSON object before storing it in XCom. 
This simplifies the workflow and ensures
+that downstream tasks receive the processed data in the desired format.
+
+Here's how you can use the result_processor with the BashOperator:
+
+.. tab-set::
+
+    .. tab-item:: @task.bash
+        :sync: taskflow
+
+        .. code-block:: python
+
+            @task.bash(output_processor=lambda output: json.loads(output))
+            def bash_task() -> str:
+                return """
+                    jq -c '.[] | select(.lastModified > "{{ 
data_interval_start | ts_zulu }}" or .created > "{{ data_interval_start | 
ts_zulu }}")' \\
+                    example.json
+                """
+
+    .. tab-item:: BashOperator
+        :sync: operator
+
+        .. code-block:: python
+
+            bash_task = BashOperator(
+                task_id="filter_today_changes",
+                bash_command="""
+                    jq -c '.[] | select(.lastModified > "{{ 
data_interval_start | ts_zulu }}" or .created > "{{ data_interval_start | 
ts_zulu }}")' \\
+                    example.json
+                """,
+                output_processor=lambda output: json.loads(output),
+            )
+
+
 Executing commands from files
 -----------------------------
 Both the ``BashOperator`` and ``@task.bash`` TaskFlow decorator enables you to 
execute Bash commands stored
diff --git a/tests/operators/test_bash.py b/tests/operators/test_bash.py
index f3e9480c71..4ef4973f08 100644
--- a/tests/operators/test_bash.py
+++ b/tests/operators/test_bash.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+import json
 import os
 import signal
 from datetime import datetime, timedelta
@@ -228,6 +229,16 @@ class TestBashOperator:
         )
         op.execute(context)
 
+    def test_bash_operator_output_processor(self, context):
+        json_string = '{"AAD_BASIC": "Azure Active Directory Basic"}'
+        op = BashOperator(
+            task_id="test_bash_operator_output_processor",
+            bash_command=f"echo '{json_string}'",
+            output_processor=lambda output: json.loads(output),
+        )
+        result = op.execute(context)
+        assert result == json.loads(json_string)
+
     @pytest.mark.db_test
     def test_bash_operator_kill(self, dag_maker):
         import psutil

Reply via email to