le-chartreux commented on issue #39801:
URL: https://github.com/apache/airflow/issues/39801#issuecomment-2176415067

   Hello Airflow Community,
   
   While waiting for a resolution to the trigger rule issue, I wanted to share 
a workaround that I've implemented.
   While it may not address every scenario, it could be beneficial for similar 
use cases.
   
   ## Problem Context
   
   In scenarios involving branching, where only one (or a part) of several 
tasks is executed and its result is needed downstream, the ideal trigger rule 
for the downstream task that will get the output would be 
`NONE_FAILED_MIN_ONE_SUCCESS`.
   
   For example:
   
   ```txt
   branching ─┬─> task_a ─┐
              OR          |─> task_to_get_the_result_of_the_task_that_runned
              └─> task b ─┘
   ```
   
   Unfortunately, this trigger rule doesn't work as expected within dynamic 
task groups, leading to the downstream task being erroneously skipped.
   
   ## Workaround Strategy
   
   My workaround involves using the `NONE_FAILED` trigger rule, which does not 
exhibit the skipping behavior.
   To emulate the desired `NONE_FAILED_MIN_ONE_SUCCESS` logic, I've 
incorporated the use of `AirflowSkipException` to skip the downstream task when 
all preceding tasks are skipped (i.e., when all outputs are None).
   
   Below is an example of using `NONE_FAILED` instead of 
`NONE_FAILED_MIN_ONE_SUCCESS`:
   
   ```py
   from airflow.decorators import dag, task, task_group
   from pendulum import datetime
   from airflow.utils.trigger_rule import TriggerRule
   from airflow.exceptions import AirflowSkipException
   
   
   @dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
   def workaround():
       directions = get_directions()
       get_message_from_direction.expand(direction=directions)
   
   
   @task
   def get_directions() -> list[str]:
       return ["left", "right", "bottom"]
   
   
   @task_group
   def get_message_from_direction(direction: str) -> str | None:
       message_from_left = left_task()
       message_from_right = right_task()
       choose_next_task_from_direction(direction) >> [message_from_left, 
message_from_right]
       return get_message(message_from_left, message_from_right)
   
   
   @task.branch
   def choose_next_task_from_direction(direction: str) -> str | None:
       this_group = "get_message_from_direction"
       if direction == "left":
           return f"{this_group}.left_task"
       if direction == "right":
           return f"{this_group}.right_task"
       # return None, nothing follows
   
   
   @task
   def left_task():
       return "message from left"
   
   
   @task
   def right_task():
       return "message from right"
   
   
   # NONE_FAILED_MIN_ONE_SUCCESS nor ONE_SUCCESS are used because they aren't 
working
   # inside dynamic task groups: see 
https://github.com/apache/airflow/issues/39801
   # To workaround, NONE_FAILED is used, but it's not a problem:
   # - If no previous task failed and at least one of the previous tasks 
returns a message,
   #   (i.e., at least one succeed), everything is fine and the message is 
returned.
   # - If at least one of the previous tasks failed, this task will not be 
triggered.
   # - If all the previous tasks are skipped, no message will be provided and 
this task
   #   will be skipped with the AirflowSkipException.
   @task(trigger_rule=TriggerRule.NONE_FAILED)
   def get_message(message_from_left: str | None, message_from_right: str | 
None) -> str:
       # Please note that the output of a successfully task should not be 
equivalent to False
       # for the or to work! Else, use something like 'if message_from_left is 
not None: ...'
       message = message_from_left or message_from_right  
       if message:
           # one succeed
           return message
       raise AirflowSkipException("Both skipped.")
   
   
   workaround()
   ```
   
   The `get_message` task will effectively mimic the behavior of 
`NONE_FAILED_MIN_ONE_SUCCESS `:
   
   - Retrieve the message from the task that was executed and succeeded.
   - Be skipped if both preceding tasks were skipped.
   - Not be triggered if any preceding task failed.
   
   I hope this workaround proves useful to others facing the same issue.
   Your feedback and suggestions for improvement are welcome.
   
   Best regards,
   Nathan Rousseau
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to