yuqian90 opened a new pull request #22374:
URL: https://github.com/apache/airflow/pull/22374
Only include downstream tasks with `ALL_SUCCESS` `trigger_rule` to prevent
tasks from being wrongly skipped by `schedule_after_task_execution`.
FIXES #19222
The reported issue started after this change by #18338: Fix mini scheduler
not respecting wait_for_downstream dep
How to reproduce:
```
import pendulum
from airflow.operators.python_operator import BranchPythonOperator
from airflow.sensors.python import PythonSensor
from airflow.operators.python import PythonOperator
from airflow.models import DAG
from airflow.utils.trigger_rule import TriggerRule
with DAG(
dag_id="example_wrong_skip",
schedule_interval="@daily",
catchup=False,
start_date=pendulum.DateTime(2022, 1, 1),
) as dag:
branch = BranchPythonOperator(task_id="branch", python_callable=lambda:
"task_b")
task_a = PythonOperator(task_id="task_a", python_callable=lambda: True)
task_b = PythonOperator(task_id="task_b", python_callable=lambda: True)
task_c = PythonSensor(task_id="task_c", python_callable=lambda: False)
task_d = PythonOperator(task_id="task_d", python_callable=lambda: True,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
branch >> [task_a, task_b]
[task_a, task_c] >> task_d
```

Observe that `task_d` which has `none_failed_min_one_success` trigger_rule
is skipped before `task_c` even finishes. This violates the `trigger_rule`
logic of `none_failed_min_one_success`.
This happens because #18338 changed the following line to
`include_downstream=True`:
```
partial_dag = task.dag.partial_subset(
task.downstream_task_ids,
include_downstream=True,
include_upstream=False,
include_direct_upstream=True,
)
```
That change caused the `partial_dag` in the "mini scheduler" to include all
downstream tasks (even the indirect downstream tasks).
In the reproducing example, once `branch` finishes, it creates a
`partial_dag` which includes `task_a`, `task_b` and `task_d` (but does not
include `task_c` because it's not downstream of `branch`). Looking at only this
`partial_dag`, the "mini scheduler" determines that `task_d` can be skipped
because its only upstream task in `partial_dag` `task_a` is in skipped state.
This happens in `DagRun._get_ready_tis()` when calling
`st.are_dependencies_met()`.
This PR fixes the issue by making the "mini scheduler" only skip tasks that
have `trigger_rule` `ALL_SUCCESS`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]