uranusjr opened a new issue, #24021:
URL: https://github.com/apache/airflow/issues/24021
### Body
A need has been established to expand a task from aggregate inputs from
different upstream tasks in a “zipped” fashion, rather than cartesian product
as implemented by `expand()`. This is currently doable for taskflow tasks, by
injecting a task to perform the zip operation, and unpack the zipped values
manually:
```python
@task
def up_1():
return ["a", "b"]
@task
def up_2():
return ["x", "y"]
@task
def aggregate(v1, v2): # The intermediate job to zip.
return list(zip(v1, v2))
@task
def output(v):
print(v)
aggregated = aggergate.expand(v1=up_1(), v2=up_2())
output.expand(v=aggregated)
# Creates two mapped tasks printing
# ["a", "x"]
# ["b", "y"]
```
However, the same cannot be done to a classic operator class, since Python
does not offer a way to unpack the zipped item into keyword arguments.
Furthermore, the intermediate aggregation task (that performs zip) can
introduce significant overhead, since it needs to create a new process, and
load the entire lists from upstreams, which is one of the things task mapping
aim to avoid if possible.
----
## Implement `expand_kwargs` on classic operators
To make zip unpacking possible for classic operators, we will add a new
method `expand_kwargs` that works similar to `expand`, but takes one single
list of dicts instead. The task expands against the list, and unpack each dict
into the operator’s keyword arguments. For example:
```python
@task
def to_dict(v):
return {"namespace": v[0], "image": v[1]}
kwargs = to_dict(v=aggregated)
KubernetesPodOperator.partial(task_id="kube").expand_kwargs(kwargs)
# Produces two mapped tasks:
# KubernetesPodOperator(namespace="a", image="x")
# KubernetesPodOperator(namespace="b", image="y")
```
-----
## Implement `map()` and on `XComArg`
To reduce overhead to convert upstream values into a zipped _dict_, we will
implement additional methods on `XComArg`. This will be done by adding a
“modifier” callable on `XComArg`, that is called before the downstream consumer
is executed, when the `XComArg` is being resolved. This means we can rewrite
the above to
```python
def to_dict(v):
return {"namespace": v[0], "image": v[1]}
kwargs = aggregated.map(to_dict)
KubernetesPodOperator.partial(task_id="kube").expand_kwargs(kwargs)
# Same result as above!
```
Note that `to_dict` in this version is _not a task_. This essentially
“merges” the previous `to_dict` task into the `KubernetesPodOperator` task and
eliminates the processes needed for the trivial operation.
It is also conceivable to implement other functional helpers, such as
`flat_map()`, to `XComArg` in the future.
### Committer
- [X] I acknowledge that I am a maintainer/committer of the Apache Airflow
project.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]