ashb commented on pull request #21328:
URL: https://github.com/apache/airflow/pull/21328#issuecomment-1031573948
The serialization needs some work for this approach. Here is an incomplete
test:
```python
def test_mapped_decorator_serde():
from airflow.models.xcom_arg import XComArg
from airflow.decorators import task
with DAG("test-dag", start_date=datetime(2020, 1, 1)) as dag:
task1 = BaseOperator(task_id="op1")
xcomarg = XComArg(task1, "test_key")
@task(retry_delay=30)
def x(arg1, arg2):
...
real_op = x.partial(arg1=1).map(arg2=xcomarg).operator
serialized = SerializedBaseOperator._serialize(real_op)
assert serialized == {
'_is_dummy': False,
'_is_mapped': True,
'_task_module': '???',
'_task_type': '???',
'downstream_task_ids': [],
'mapped_kwargs': {
'arg1': [
1,
2,
{"__type": "dict", "__var": {'a': 'b'}},
],
'arg2': {'__type': 'xcomref', '__var': {'task_id': 'op1', 'key':
'test_key'}},
},
'task_id': 'x',
# We don't want to include the python source code in the serialized
representation
# TODO? Where does `retry_delay` go? We need to separate
}
op = SerializedBaseOperator.deserialize_operator(serialized)
assert isinstance(op, MappedOperator)
assert op.deps is MappedOperator.DEFAULT_DEPS
# TODO: add some more asserts here
```
Trying to call `real_op.unmap()` also throws a key error:
```
tests/serialization/test_dag_serialization.py:1670: in
test_mapped_decorator_serde
real_op.unmap()
airflow/models/baseoperator.py:1882: in unmap
dag._remove_task(self.task_id)
airflow/models/dag.py:2167: in _remove_task
task = self.task_dict.pop(task_id)
E KeyError: 'x'
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]