AndersonReyes commented on pull request #10349:
URL: https://github.com/apache/airflow/pull/10349#issuecomment-675211484
I can't quite figure a clean unpacking of the xcomargs without knowing the
size of the output in advance. Right now i have this which is not clean, infer
the number of outputs from the typing and pass that to the _PythonOperator but
still brainstorming on how to do that unpacking or if something else got a
solution will prob leave this for another pr.
```
def _infer_multiple_outputs(
python_callable: Optional[Callable] = None,
n_outputs: Optional[int] = None,
multiple_outputs: bool = False,
) -> Tuple[bool, Union[None, int]]:
"""
Try to infer multiple outputs and number of outputs from typing.
This a hack really and only works for tuples.
"""
if not python_callable:
return multiple_outputs, n_outputs
sig = signature(python_callable).return_annotation
ttype = getattr(sig, "__origin__", None)
if (
sig != inspect.Signature.empty
and is_container(ttype)
):
multiple_outputs = True
# see if we can infer the number of outputs
type_args = sig.__args__
if (not n_outputs )and (ttype in (Tuple, tuple)) and (Ellipsis not
in type_args):
n_outputs = len(type_args)
return multiple_outputs, n_outputs
def task(
python_callable: Optional[Callable] = None,
multiple_outputs: bool = False,
n_outputs: Optional[int] = None,
**kwargs
) -> Callable[[T], T]:
"""
Python operator decorator. Wraps a function into an Airflow operator.
Accepts kwargs for operator kwarg. Can be reused in a single DAG.
:param python_callable: Function to decorate
:type python_callable: Optional[Callable]
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. List/Tuples will unroll to xcom
values
with index as key. Dict will unroll to xcom values with keys as XCom
keys.
Defaults to False.
:type multiple_outputs: bool
"""
multiple_outputs, n_outputs = _infer_multiple_outputs(
python_callable=python_callable, n_outputs=n_outputs,
multiple_outputs=multiple_outputs)
def wrapper(f: T):
"""
Python wrapper to generate PythonFunctionalOperator out of simple
python functions.
Used for Airflow functional interface
"""
_PythonFunctionalOperator.validate_python_callable(f)
kwargs.setdefault('task_id', f.__name__)
@functools.wraps(f)
def factory(*args, **f_kwargs):
op = _PythonFunctionalOperator(python_callable=f, op_args=args,
op_kwargs=f_kwargs,
multiple_outputs=multiple_outputs, n_outputs=n_outputs,
**kwargs)
return XComArg(op)
return cast(T, factory)
if callable(python_callable):
return wrapper(python_callable)
elif python_callable is not None:
raise AirflowException('No args allowed while using @task, use
kwargs instead')
return wrapper
```
and the iter for XcomArg
```
def __iter__(self):
return iter(XComArg(operator=self.operator, key=str(i)) for i in
range(self.operator._n_outputs))
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]