ashb commented on pull request #21328:
URL: https://github.com/apache/airflow/pull/21328#issuecomment-1031573948


   The serialization needs some work for this approach. Here is an incomplete 
test:
   
   ```python
   def test_mapped_decorator_serde():
       from airflow.models.xcom_arg import XComArg
       from airflow.decorators import task
   
       with DAG("test-dag", start_date=datetime(2020, 1, 1)) as dag:
           task1 = BaseOperator(task_id="op1")
           xcomarg = XComArg(task1, "test_key")
   
           @task(retry_delay=30)
           def x(arg1, arg2):
               ...
   
       real_op = x.partial(arg1=1).map(arg2=xcomarg).operator
   
       serialized = SerializedBaseOperator._serialize(real_op)
   
       assert serialized == {
           '_is_dummy': False,
           '_is_mapped': True,
           '_task_module': '???',
           '_task_type': '???',
           'downstream_task_ids': [],
           'mapped_kwargs': {
               'arg1': [
                   1,
                   2,
                   {"__type": "dict", "__var": {'a': 'b'}},
               ],
               'arg2': {'__type': 'xcomref', '__var': {'task_id': 'op1', 'key': 
'test_key'}},
           },
           'task_id': 'x',
           # We don't want to include the python source code in the serialized 
representation
           # TODO? Where does `retry_delay` go? We need to separate 
       }
   
       op = SerializedBaseOperator.deserialize_operator(serialized)
       assert isinstance(op, MappedOperator)
       assert op.deps is MappedOperator.DEFAULT_DEPS
   
       # TODO: add some more asserts here
   ```
   
   Trying to call `real_op.unmap()` also throws a key error:
   
   
   ```
   tests/serialization/test_dag_serialization.py:1670: in 
test_mapped_decorator_serde
       real_op.unmap()
   airflow/models/baseoperator.py:1882: in unmap
       dag._remove_task(self.task_id)
   airflow/models/dag.py:2167: in _remove_task
       task = self.task_dict.pop(task_id)
   E   KeyError: 'x'
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to