This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0d9a26ceef Add branching based on mapped task group example to
dynamic-task-mapping.rst (#36480)
0d9a26ceef is described below
commit 0d9a26ceefb2a5661f19cce292e05939d3f2a0c1
Author: Ryan Hatter <[email protected]>
AuthorDate: Fri Dec 29 18:04:14 2023 -0500
Add branching based on mapped task group example to
dynamic-task-mapping.rst (#36480)
* Add branching based on mapped task group example to
dynamic-task-mapping.rst
Based on trying to solve [this stack overflow
question](https://stackoverflow.com/questions/77730116/branching-not-working-in-airflow-as-expected/77730300#77730300),
it seems impossible to reliably branch mapped tasks based on the result of an
upstream task. However, it's possible to do this in a mapped task group, which
this example demonstrates.
* trying to force blacken-docs
---
.../dynamic-task-mapping.rst | 44 ++++++++++++++++++++--
1 file changed, 41 insertions(+), 3 deletions(-)
diff --git
a/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst
b/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst
index ecfe8e2413..81102dd54e 100644
--- a/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst
@@ -313,7 +313,7 @@ For example, this code will *not* work:
@task_group
- def my_group(value):
+ def my_task_group(value):
if not value: # DOES NOT work as you'd expect!
task_a = EmptyOperator(...)
else:
@@ -321,9 +321,9 @@ For example, this code will *not* work:
task_a << my_task(value)
- my_group.expand(value=[0, 1, 2])
+ my_task_group.expand(value=[0, 1, 2])
-When code in ``my_group`` is executed, ``value`` would still only be a
reference, not the real value, so the ``if not value`` branch will not work as
you likely want. However, if you pass that reference into a task, it will
become resolved when the task is executed, and the three ``my_task`` instances
will therefore receive 1, 2, and 3, respectively.
+When code in ``my_task_group`` is executed, ``value`` would still only be a
reference, not the real value, so the ``if not value`` branch will not work as
you likely want. However, if you pass that reference into a task, it will
become resolved when the task is executed, and the three ``my_task`` instances
will therefore receive 1, 2, and 3, respectively.
It is, therefore, important to remember that, if you intend to perform any
logic to a value passed into a task group function, you must always use a task
to run the logic, such as ``@task.branch`` (or ``BranchPythonOperator``) for
conditions, and task mapping methods for loops.
@@ -375,6 +375,44 @@ Similar to a mapped task group, depending on a mapped task
group's output would
It is also possible to perform any operations as results from a normal mapped
task.
+Branching on a mapped task group's output
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+While it's not possible to implement branching logic (for example using
``@task.branch``) on the results of a mapped task, it is possible to branch
based on the *input* of a task group. The following example demonstrates
executing one of three tasks based on the input to a mapped task group.
+
+.. code-block:: python
+
+ inputs = ["a", "b", "c"]
+
+
+ @task_group(group_id="my_task_group")
+ def my_task_group(input):
+ @task.branch
+ def branch(element):
+ if "a" in element:
+ return "my_task_group.a"
+ elif "b" in element:
+ return "my_task_group.b"
+ else:
+ return "my_task_group.c"
+
+ @task
+ def a():
+ print("a")
+
+ @task
+ def b():
+ print("b")
+
+ @task
+ def c():
+ print("c")
+
+ branch(input) >> [a(), b(), c()]
+
+
+ my_task_group.expand(input=inputs)
+
Filtering items from a mapped task
==================================