TJaniF opened a new pull request, #34392:
URL: https://github.com/apache/airflow/pull/34392

   Currently tasks with the trigger rule `all_skipped` end up in `no status` 
state if they have an upstream task that is in `upstream_failed` state.
   
   
![image](https://github.com/apache/airflow/assets/90063506/7449e4a5-cf8f-4dd6-b830-b9c93a1a60b8)
   
   The logs show a Task deadlock error
   
   ```
   [2023-09-15T12:36:02.600+0000] {dagrun.py:674} ERROR - Task deadlock (no 
runnable tasks); marking run <DagRun all_skipped_example_2 @ 2023-09-15 
12:35:59.760313+00:00: manual__2023-09-15T12:35:59.760313+00:00, state:running, 
queued_at: 2023-09-15 12:35:59.767117+00:00. externally triggered: True> failed
   ```
   
   This seemed not right so I attempted to fix it by adding `upstream` failed 
as a condition to mark tasks with the `all_skipped` trigger rule as skipped.
   
   After the fix this is the behavior of the same DAG.
   
   
![image](https://github.com/apache/airflow/assets/90063506/5201c910-dd48-46ce-8b82-4b4a95aac6ea)
   
   This behavior would be the same as for `all_failed` where an upstream task 
in `success` or `skipped` state leads to the task to be skipped. But I think 
maybe there is an argument for having the task with the `all_skipped` trigger 
rule fail instead of being skipped if an upstream task is in `upstream_failed` 
to preserve the final DAG state if the `all_skipped` task is the only leaf? The 
current error leads to a DAG run failure, while skipping the final task marks 
it as successful.
   
   DAG to reproduce:
   
   ```python
   from airflow.decorators import dag, task
   from pendulum import datetime
   
   @dag(
       start_date=datetime(2023, 8, 1),
       schedule=None,
       catchup=False,
   )
   def all_skipped_example_2():
       @task
       def upstream_task_0():
           raise Exception("Task 0 failed")
   
       @task
       def upstream_task_1():
           return "hi"
   
       @task(trigger_rule="all_skipped")
       def all_skipped_task_1():
           return "hi"
   
       @task
       def downstream_task():
           return "hi"
   
       upstream_task_1_obj = upstream_task_1()
   
       (upstream_task_1_obj >> all_skipped_task_1() >> downstream_task())
   
       upstream_task_0() >> upstream_task_1_obj
   
   
   all_skipped_example_2()
   
   ```
   
   I also attempted to add a test, apologies in advance.
   
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to