manugarri opened a new issue, #37119:
URL: https://github.com/apache/airflow/issues/37119
### Description
Dynamic tasks enable dag creation based on run time based number of inputs.
This is great because it not only makes dags flexible, but also it reduces the
workload on the scheduler since the tasks are defined at run time.
However, the current dynamic tasks run all inputs in parallel (via the
`expand` function). However, there are some cases where a list of steps needs
to be run sequentially, but **the list is not known until runtime**.
### Use case/motivation
It is helpful to show an actual use case to see why the feature would be
useful.
I am parsing a yaml with some EMR steps to run as a `@task` , and I cannot
add a task inside another one with a for loop to create the actual operators
that run the operations described on the yaml file.
The dag looks more or less like this :
```
with DAG(...):
@task
def parse_yaml(yaml_path):
"""
read the templated yaml and expand the jinja variables inside
This yaml has blocks of steps that have to run sequentially:
something like this:
groups:
- group_name:1
steps:
- step_name: 1.1
step_emr_command: spark-submit ...
- step_name: 1.2
step_emr_command: spark-submit ...
- group_name:2
steps:
- step_name: 2.1
step_emr_command: spark-submit ...
- step_name: 2.2
step_emr_command: spark-submit ...
"""
return parsed_yaml
@task_group
def run_group(group_definition):
setup_emr_cluster = EMRCreateJobFlowOperator()
run_steps = EMRAddStepsOperator(group_definition['steps'])
terminate_emr_cluster = EMRTerminateJobFlowOperator()
setup_emr_cluster >> run_steps >> terminate_emr_cluster
parsed_yaml = parse_yaml(yaml_path)
# this runs all the step groups in parallel (bad)
run_group.expand(parsed_yaml)
```
hopefully this pseudo example showcases what im trying to achieve. I need
the `run_group` for `group:1` to run before `group2` , not in parallel.
Here is the diagram that should be run based on the yaml file above:
```
┌───────────────────────────────────────────────────┐
┌─────────────────────────────────────────────────┐
│
│ │ │
│
│ │ │
┌─────────────┐ │
│ │ │
│ │ │
│ │ ┌─────────┐ ┌───────┐ ┌───────┐ ┌───────── │
│ parse_yaml ├──────────►│ ┌───────── ┌───────┐ ┌────────┐ ┌──────────
│ │ │EMRCreate├─►│step ├──►│step ├──►│Terminate │
│ │ │ │EMRCreate │ step │ │step │
│Terminate''e├───────────────►│ │Cluster2 │ │2.1 │ │2.2 │ │Cluster2│
│
│ │ │ │Cluster1┌─►│ 1.1 ├──►│ 1.2 ├─►│Cluster1 │
│ │ └─────────┘ └───────┘ └───────┘ └────────┘ │
└─────────────┘ │ └────────┘ └───────┘ └────────┘ └─────────┘
│ │ │
│
│ │ │
│
│ │ │
│
│ │ │
│
│ │ │
│
│ │ │
│ Group 1
│ │ Group 2 │
│
│ │ │
└───────────────────────────────────────────────────┘
└─────────────────────────────────────────────────┘
```
### Related issues
There was a previous unanswered discussion on the repo
([link](https://github.com/apache/airflow/discussions/26093)).
There seem to be some related questions on StackOverflow (
[1](https://stackoverflow.com/questions/75190990/design-airflow-pipeline-to-perform-sequential-tasks-based-on-payload-sent-to-air),
[2](https://stackoverflow.com/questions/73965964/sequentially-executed-dynamic-tasks-in-airflow)
[3](https://stackoverflow.com/questions/77354558/dynamic-task-creation-in-airflow-dag)
,
[4](https://stackoverflow.com/questions/77283119/how-to-correctly-execute-multiple-airflow-operators-sequentially-in-a-for-loop-i))
### Are you willing to submit a PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]