inytar opened a new issue, #28973:
URL: https://github.com/apache/airflow/issues/28973
### Apache Airflow version
2.5.0
### What happened
In some cases we are seeing dynamic mapped task being skipped before
upstream tasks have started & the dynamic count for the task can be calculated.
We see this both locally in a with the `LocalExecutor` & on our cluster with
the `KubernetesExecutor`.
To trigger the issue we need multiple dynamic tasks merging into a upstream
task, see the image below for example. If there is no merging the tasks run as
expected. The tasks also need to not know the number of dynamic tasks that will
be created on DAG start, for example by chaining in an other dynamic task
output.
If the DAG, task, or upstream tasks are cleared the skipped task runs as
expected.
The issue exists both on airflow 2.4.x & 2.5.0.
Happy to help debug this further & answer any questions!
### What you think should happen instead
The tasks should run after upstream tasks are done.
### How to reproduce
The following code is able to reproduce the issue on our side:
```python
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.utils.task_group import TaskGroup
from airflow.operators.empty import EmptyOperator
# Only one chained tasks results in only 1 of the `skipped_tasks` skipping.
# Add in extra tasks results in both `skipped_tasks` skipping, but
# no earlier tasks are ever skipped.
CHAIN_TASKS = 1
@task()
def add(x, y):
return x, y
with DAG(
dag_id="test_skip",
schedule=None,
start_date=datetime(2023, 1, 13),
) as dag:
init = EmptyOperator(task_id="init_task")
final = EmptyOperator(task_id="final")
for i in range(2):
with TaskGroup(f"task_group_{i}") as tg:
chain_task = [i]
for j in range(CHAIN_TASKS):
chain_task = add.partial(x=j).expand(y=chain_task)
skipped_task = (
add.override(task_id="skipped").partial(x=i).expand(y=chain_task)
)
# Task isn't skipped if final (merging task) is removed.
init >> tg >> final
```
### Operating System
MacOS
### Versions of Apache Airflow Providers
This can be reproduced without any extra providers installed.
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]