inytar opened a new issue, #28973:
URL: https://github.com/apache/airflow/issues/28973

   ### Apache Airflow version
   
   2.5.0
   
   ### What happened
   
   In some cases we are seeing dynamic mapped task being skipped before 
upstream tasks have started & the dynamic count for the task can be calculated. 
We see this both locally in a with the `LocalExecutor` & on our cluster with 
the `KubernetesExecutor`.
   
   To trigger the issue we need multiple dynamic tasks merging into a upstream 
task, see the image below for example. If there is no merging the tasks run as 
expected. The tasks also need to not know the number of dynamic tasks that will 
be created on DAG start, for example by chaining in an other dynamic task 
output.
   
   If the DAG, task, or upstream tasks are cleared the skipped task runs as 
expected. 
   
   The issue exists both on airflow 2.4.x & 2.5.0.
   
   Happy to help debug this further & answer any questions!
   
   ### What you think should happen instead
   
   The tasks should run after upstream tasks are done.
   
   ### How to reproduce
   
   The following code is able to reproduce the issue on our side:
   
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.decorators import task
   from airflow.utils.task_group import TaskGroup
   from airflow.operators.empty import EmptyOperator
   
   # Only one chained tasks results in only 1 of the `skipped_tasks` skipping.
   # Add in extra tasks results in both `skipped_tasks` skipping, but
   # no earlier tasks are ever skipped.
   CHAIN_TASKS = 1
   
   
   @task()
   def add(x, y):
       return x, y
   
   
   with DAG(
       dag_id="test_skip",
       schedule=None,
       start_date=datetime(2023, 1, 13),
   ) as dag:
   
       init = EmptyOperator(task_id="init_task")
       final = EmptyOperator(task_id="final")
   
       for i in range(2):
           with TaskGroup(f"task_group_{i}") as tg:
               chain_task = [i]
               for j in range(CHAIN_TASKS):
                   chain_task = add.partial(x=j).expand(y=chain_task)
               skipped_task = (
                   
add.override(task_id="skipped").partial(x=i).expand(y=chain_task)
               )
   
           # Task isn't skipped if final (merging task) is removed.
           init >> tg >> final
   ```
   
   ### Operating System
   
   MacOS
   
   ### Versions of Apache Airflow Providers
   
   This can be reproduced without any extra providers installed.
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to