Tehada opened a new issue, #23803: URL: https://github.com/apache/airflow/issues/23803
### Description Current implementation of [dynamic task mapping](https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html) as of airflow 2.3.0 is lacking some features to support different workflows. Particularly, it lacks "zip-like" functionality to dynamically spawn tasks. Consider this example (concept, not an actual code): ``` @task def get_params(): res = { "arg1": [1, 3], "arg2": [2, 4] } return res @task def run_task(arg1, arg2): print(arg1, arg2) with DAG(dag_id="airflow_fun", start_date=datetime(2022, 5, 16)) as dag: p = get_params() run_task.expand_zip(arg1=p["arg1"], arg2=p["arg2"]) ``` The desired behavior for this code is to run `run_task` twice: `run_task(arg1=1, arg2=2)` and `run_task(arg1=3, arg2=4)`. It is not possible to implement such workflow in current implementation of dynamic tasks using `expand` API -- I want such API to appear in airflow. It is possible, that creating something similar to current `expand` is not the best approach and may be there is another fundamentally different way, which will be simple to use and implement -- I am going to make some research of exising tools with this feature (prefect and dagster coming in mind first, luigi seems to be dead). Also, @ashb noted in slack: > The first thing to work out and discuss is what is the syntax to the DAG author? Which leads me to this plan (may be updated later): 1. Gather existing suggestions in this issue for further comparison. 2. Research goob and bad practices of this API in different tools. 3. Find out usecases and workflows, which will be not covered by suggested API (it is desirable to cover all usecases). This plan and description are open for any feedback, suggestions, criticism. ### Use case/motivation One real-life usecase is to have a task `get_params`, which, when triggered, will list gcs bucket for new files for yesterday (number of files is variable day-to-day) and generate arguments for `KubernetesPodOperator` to run processing task in pod. But the processing is quite RAM-hungry, so I also predict RAM usage of particular processing task based on the file size in bucket. Then I want to run processing for each file in a separate pod with dynamically generated `cmds`, `arguments` and `resources` for each pod. With "zip-like" dynamic mapping this usecase will be trivially implementable. ### Related issues _No response_ ### Are you willing to submit a PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
