le-chartreux commented on issue #39801:
URL: https://github.com/apache/airflow/issues/39801#issuecomment-2176415067
Hello Airflow Community,
While waiting for a resolution to the trigger rule issue, I wanted to share
a workaround that I've implemented.
While it may not address every scenario, it could be beneficial for similar
use cases.
## Problem Context
In scenarios involving branching, where only one (or a part) of several
tasks is executed and its result is needed downstream, the ideal trigger rule
for the downstream task that will get the output would be
`NONE_FAILED_MIN_ONE_SUCCESS`.
For example:
```txt
branching ─┬─> task_a ─┐
OR |─> task_to_get_the_result_of_the_task_that_runned
└─> task b ─┘
```
Unfortunately, this trigger rule doesn't work as expected within dynamic
task groups, leading to the downstream task being erroneously skipped.
## Workaround Strategy
My workaround involves using the `NONE_FAILED` trigger rule, which does not
exhibit the skipping behavior.
To emulate the desired `NONE_FAILED_MIN_ONE_SUCCESS` logic, I've
incorporated the use of `AirflowSkipException` to skip the downstream task when
all preceding tasks are skipped (i.e., when all outputs are None).
Below is an example of using `NONE_FAILED` instead of
`NONE_FAILED_MIN_ONE_SUCCESS`:
```py
from airflow.decorators import dag, task, task_group
from pendulum import datetime
from airflow.utils.trigger_rule import TriggerRule
from airflow.exceptions import AirflowSkipException
@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def workaround():
directions = get_directions()
get_message_from_direction.expand(direction=directions)
@task
def get_directions() -> list[str]:
return ["left", "right", "bottom"]
@task_group
def get_message_from_direction(direction: str) -> str | None:
message_from_left = left_task()
message_from_right = right_task()
choose_next_task_from_direction(direction) >> [message_from_left,
message_from_right]
return get_message(message_from_left, message_from_right)
@task.branch
def choose_next_task_from_direction(direction: str) -> str | None:
this_group = "get_message_from_direction"
if direction == "left":
return f"{this_group}.left_task"
if direction == "right":
return f"{this_group}.right_task"
# return None, nothing follows
@task
def left_task():
return "message from left"
@task
def right_task():
return "message from right"
# NONE_FAILED_MIN_ONE_SUCCESS nor ONE_SUCCESS are used because they aren't
working
# inside dynamic task groups: see
https://github.com/apache/airflow/issues/39801
# To workaround, NONE_FAILED is used, but it's not a problem:
# - If no previous task failed and at least one of the previous tasks
returns a message,
# (i.e., at least one succeed), everything is fine and the message is
returned.
# - If at least one of the previous tasks failed, this task will not be
triggered.
# - If all the previous tasks are skipped, no message will be provided and
this task
# will be skipped with the AirflowSkipException.
@task(trigger_rule=TriggerRule.NONE_FAILED)
def get_message(message_from_left: str | None, message_from_right: str |
None) -> str:
# Please note that the output of a successfully task should not be
equivalent to False
# for the or to work! Else, use something like 'if message_from_left is
not None: ...'
message = message_from_left or message_from_right
if message:
# one succeed
return message
raise AirflowSkipException("Both skipped.")
workaround()
```
The `get_message` task will effectively mimic the behavior of
`NONE_FAILED_MIN_ONE_SUCCESS `:
- Retrieve the message from the task that was executed and succeeded.
- Be skipped if both preceding tasks were skipped.
- Not be triggered if any preceding task failed.
I hope this workaround proves useful to others facing the same issue.
Your feedback and suggestions for improvement are welcome.
Best regards,
Nathan Rousseau
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]