manugarri opened a new issue, #37119:
URL: https://github.com/apache/airflow/issues/37119

   ### Description
   
   Dynamic tasks enable dag creation based on run time based number of inputs. 
This is great because it not only makes dags flexible, but also it reduces the 
workload on the scheduler since the tasks are defined at run time.
   
   However, the current dynamic tasks run all inputs in parallel (via the 
`expand` function). However, there are some cases where a list of steps needs 
to be run sequentially, but **the list is not known until runtime**.
   
   ### Use case/motivation
   
   It is helpful to show an actual use case to see why the feature would be 
useful.
   
   I am parsing a yaml with some EMR steps to run as a `@task` , and I cannot 
add a task inside another one with a for loop to create the actual operators 
that run  the operations described on the yaml file.
   
   The dag looks more or less like this :
   ```
   with DAG(...):
     @task
     def parse_yaml(yaml_path):
        """ 
        read the templated yaml and expand the jinja variables inside
        This yaml has blocks of steps that have to run sequentially:
         something like this:
         groups:
           - group_name:1
             steps:
               - step_name: 1.1
                 step_emr_command: spark-submit ...
               - step_name: 1.2
                 step_emr_command: spark-submit ...
           - group_name:2
             steps:
                - step_name: 2.1
                  step_emr_command: spark-submit ...
                - step_name: 2.2
                  step_emr_command: spark-submit ...
         """
        return parsed_yaml
     
     @task_group
     def run_group(group_definition):
        setup_emr_cluster = EMRCreateJobFlowOperator()
        run_steps = EMRAddStepsOperator(group_definition['steps'])
        terminate_emr_cluster = EMRTerminateJobFlowOperator()
        setup_emr_cluster >> run_steps >> terminate_emr_cluster
     
     parsed_yaml = parse_yaml(yaml_path)
     
     # this runs all the step groups in parallel (bad)
     run_group.expand(parsed_yaml)
   ```
   
   hopefully this pseudo example showcases what im trying to achieve. I need 
the `run_group` for `group:1`  to run before `group2` , not in parallel.
   
   Here is the diagram that should be run based on the yaml file above:
   
   ```
                             
┌───────────────────────────────────────────────────┐                
┌─────────────────────────────────────────────────┐
                             │                                                  
 │                │                                                 │
                             │                                                  
 │                │                                                 │
   ┌─────────────┐           │                                                  
 │                │                                                 │
   │             │           │                                                  
 │                │ ┌─────────┐  ┌───────┐   ┌───────┐   ┌───────── │
   │  parse_yaml ├──────────►│  ┌─────────  ┌───────┐   ┌────────┐  ┌────────── 
 │                │ │EMRCreate├─►│step   ├──►│step   ├──►│Terminate │
   │             │           │  │EMRCreate  │ step  │   │step    │  
│Terminate''e├───────────────►│ │Cluster2 │  │2.1    │   │2.2    │   │Cluster2│ 
│
   │             │           │  │Cluster1┌─►│ 1.1   ├──►│ 1.2    ├─►│Cluster1 │ 
 │                │ └─────────┘  └───────┘   └───────┘   └────────┘ │
   └─────────────┘           │  └────────┘  └───────┘   └────────┘  └─────────┘ 
 │                │                                                 │
                             │                                                  
 │                │                                                 │
                             │                                                  
 │                │                                                 │
                             │                                                  
 │                │                                                 │
                             │                                                  
 │                │                                                 │
                             │                                                  
 │                │                                                 │
                             │                             Group 1              
 │                │                                         Group 2 │
                             │                                                  
 │                │                                                 │
                             
└───────────────────────────────────────────────────┘                
└─────────────────────────────────────────────────┘
   ```
   
   ### Related issues
   
   There was a previous unanswered discussion on the repo 
([link](https://github.com/apache/airflow/discussions/26093)).
   
   There seem to be some related questions on StackOverflow ( 
[1](https://stackoverflow.com/questions/75190990/design-airflow-pipeline-to-perform-sequential-tasks-based-on-payload-sent-to-air),
 
[2](https://stackoverflow.com/questions/73965964/sequentially-executed-dynamic-tasks-in-airflow)
 
[3](https://stackoverflow.com/questions/77354558/dynamic-task-creation-in-airflow-dag)
 , 
[4](https://stackoverflow.com/questions/77283119/how-to-correctly-execute-multiple-airflow-operators-sequentially-in-a-for-loop-i))
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to