uranusjr opened a new issue, #24021:
URL: https://github.com/apache/airflow/issues/24021

   ### Body
   
   A need has been established to expand a task from aggregate inputs from 
different upstream tasks in a “zipped” fashion, rather than cartesian product 
as implemented by `expand()`. This is currently doable for taskflow tasks, by 
injecting a task to perform the zip operation, and unpack the zipped values 
manually:
   
   ```python
   @task
   def up_1():
       return ["a", "b"]
   
   @task
   def up_2():
       return ["x", "y"]
   
   @task
   def aggregate(v1, v2):  # The intermediate job to zip.
       return list(zip(v1, v2))
   
   @task
   def output(v):
       print(v)
   
   aggregated = aggergate.expand(v1=up_1(), v2=up_2())
   
   output.expand(v=aggregated)
   # Creates two mapped tasks printing
   # ["a", "x"]
   # ["b", "y"]
   ```
   
   However, the same cannot be done to a classic operator class, since Python 
does not offer a way to unpack the zipped item into keyword arguments.
   
   Furthermore, the intermediate aggregation task (that performs zip) can 
introduce significant overhead, since it needs to create a new process, and 
load the entire lists from upstreams, which is one of the things task mapping 
aim to avoid if possible.
   
   ----
   
   ## Implement `expand_kwargs` on classic operators
   
   To make zip unpacking possible for classic operators, we will add a new 
method `expand_kwargs` that works similar to `expand`, but takes one single 
list of dicts instead. The task expands against the list, and unpack each dict 
into the operator’s keyword arguments. For example:
   
   ```python
   @task
   def to_dict(v):
       return {"namespace": v[0], "image": v[1]}
   
   kwargs = to_dict(v=aggregated)
   KubernetesPodOperator.partial(task_id="kube").expand_kwargs(kwargs)
   # Produces two mapped tasks:
   # KubernetesPodOperator(namespace="a", image="x")
   # KubernetesPodOperator(namespace="b", image="y")
   ```
   
   -----
   
   ## Implement `map()` and on `XComArg`
   
   To reduce overhead to convert upstream values into a zipped _dict_, we will 
implement additional methods on `XComArg`. This will be done by adding a 
“modifier” callable on `XComArg`, that is called before the downstream consumer 
is executed, when the `XComArg` is being resolved. This means we can rewrite 
the above to
   
   ```python
   def to_dict(v):
       return {"namespace": v[0], "image": v[1]}
   
   kwargs = aggregated.map(to_dict)
   KubernetesPodOperator.partial(task_id="kube").expand_kwargs(kwargs)
   # Same result as above!
   ```
   
   Note that `to_dict` in this version is _not a task_. This essentially 
“merges” the previous `to_dict` task into the `KubernetesPodOperator` task and 
eliminates the processes needed for the trivial operation.
   
   It is also conceivable to implement other functional helpers, such as 
`flat_map()`, to `XComArg` in the future.
   
   ### Committer
   
   - [X] I acknowledge that I am a maintainer/committer of the Apache Airflow 
project.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to