Tehada opened a new issue, #23803:
URL: https://github.com/apache/airflow/issues/23803

   ### Description
   
   Current implementation of [dynamic task 
mapping](https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html)
 as of airflow 2.3.0 is lacking some features to support different workflows. 
Particularly, it lacks "zip-like" functionality to dynamically spawn tasks. 
Consider this example (concept, not an actual code):
   
   ```
   @task
   def get_params():
       res = {
           "arg1": [1, 3],
           "arg2": [2, 4]
       }
       return res
   
   @task
   def run_task(arg1, arg2):
       print(arg1, arg2)
   
   with DAG(dag_id="airflow_fun", start_date=datetime(2022, 5, 16)) as dag:
       p = get_params()
       run_task.expand_zip(arg1=p["arg1"], arg2=p["arg2"])
   ```
   
   The desired behavior for this code is to run `run_task` twice: 
`run_task(arg1=1, arg2=2)` and `run_task(arg1=3, arg2=4)`. It is not possible 
to implement such workflow in current implementation of dynamic tasks using 
`expand` API -- I want such API to appear in airflow.
   
   It is possible, that creating something similar to current `expand` is not 
the best approach and may be there is another fundamentally different way, 
which will be simple to use and implement -- I am going to make some research 
of exising tools with this feature (prefect and dagster coming in mind first, 
luigi seems to be dead).
   
   Also, @ashb noted in slack:
   > The first thing to work out and discuss is what is the syntax to the DAG 
author?
   
   Which leads me to this plan (may be updated later):
   
   1. Gather existing suggestions in this issue for further comparison.
   2. Research goob and bad practices of this API in different tools.
   3. Find out usecases and workflows, which will be not covered by suggested 
API (it is desirable to cover all usecases).
   
   This plan and description are open for any feedback, suggestions, criticism.
   
   ### Use case/motivation
   
   One real-life usecase is to have a task `get_params`, which, when triggered, 
will list gcs bucket for new files for yesterday (number of files is variable 
day-to-day) and generate arguments for `KubernetesPodOperator` to run 
processing task in pod. But the processing is quite RAM-hungry, so I also 
predict RAM usage of particular processing task based on the file size in 
bucket. Then I want to run processing for each file in a separate pod with 
dynamically generated `cmds`, `arguments` and `resources` for each pod. With 
"zip-like" dynamic mapping this usecase will be trivially implementable.
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to