sc-anssi opened a new issue, #35361:
URL: https://github.com/apache/airflow/issues/35361
### Apache Airflow version
2.7.2
### What happened
Trying to map over a dict instead of a list results in a KeyError (the
following backtrace is issued when running the DAG below with python 3.11) :
```
[2023-11-01T23:37:35.428+0100] {dag.py:3966} INFO - dagrun id: test
[2023-11-01T23:37:35.438+0100] {dag.py:3982} INFO - created dagrun <DagRun
test @ 2023-11-01T22:37:35.362926+00:00:
manual__2023-11-01T22:37:35.362926+00:00, state:running, queued_at: None.
externally triggered: False>
[2023-11-01T23:37:35.442+0100] {dag.py:3930} INFO -
*****************************************************
[2023-11-01T23:37:35.443+0100] {dag.py:3934} INFO - Running task get_dict
[2023-11-01 23:37:35,509] {taskinstance.py:1662} INFO - Exporting env vars:
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test'
AIRFLOW_CTX_TASK_ID='get_dict'
AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T22:37:35.362926+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T22:37:35.362926+00:00'
[2023-11-01T23:37:35.509+0100] {taskinstance.py:1662} INFO - Exporting env
vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test'
AIRFLOW_CTX_TASK_ID='get_dict'
AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T22:37:35.362926+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T22:37:35.362926+00:00'
[2023-11-01 23:37:35,511] {python.py:194} INFO - Done. Returned value was:
{'key1': ('item1', 'item2'), 'key2': ('item3', 'item4')}
[2023-11-01T23:37:35.511+0100] {python.py:194} INFO - Done. Returned value
was: {'key1': ('item1', 'item2'), 'key2': ('item3', 'item4')}
[2023-11-01 23:37:35,533] {taskinstance.py:1400} INFO - Marking task as
SUCCESS. dag_id=test, task_id=get_dict, execution_date=20231101T223735,
start_date=, end_date=20231101T223735
[2023-11-01T23:37:35.533+0100] {taskinstance.py:1400} INFO - Marking task as
SUCCESS. dag_id=test, task_id=get_dict, execution_date=20231101T223735,
start_date=, end_date=20231101T223735
[2023-11-01T23:37:35.552+0100] {dag.py:3938} INFO - get_dict ran
successfully!
[2023-11-01T23:37:35.553+0100] {dag.py:3941} INFO -
*****************************************************
[2023-11-01T23:37:35.572+0100] {dag.py:3930} INFO -
*****************************************************
[2023-11-01T23:37:35.572+0100] {dag.py:3934} INFO - Running task print_msg
[2023-11-01 23:37:35,595] {taskinstance.py:1937} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py",
line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File
"/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py",
line 1647, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py",
line 2285, in render_templates
original_task.render_template_fields(context)
File
"/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/mappedoperator.py",
line 725, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/decorators/base.py",
line 528, in _expand_mapped_kwargs
op_kwargs, resolved_oids = super()._expand_mapped_kwargs(context,
session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/mappedoperator.py",
line 584, in _expand_mapped_kwargs
return self._get_specified_expand_input().resolve(context, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/expandinput.py",
line 200, in resolve
data = {k: self._expand_mapped_field(k, v, context, session=session) for
k, v in self.value.items()}
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/expandinput.py",
line 200, in <dictcomp>
data = {k: self._expand_mapped_field(k, v, context, session=session) for
k, v in self.value.items()}
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/expandinput.py",
line 184, in _expand_mapped_field
return value[found_index]
~~~~~^^^^^^^^^^^^^
File
"/home/user/virtualenvs/airflow/lib/python3.11/site-packages/airflow/models/xcom_arg.py",
line 455, in __getitem__
value = self.value[index]
~~~~~~~~~~^^^^^^^
KeyError: 0
[...]
```
### What you think should happen instead
The same way it is possible to expand over dict data, it should be possible
to map over dict data as well. I believe mapped tasks are passed a tuple `(k,
v)` as argument when using `expand()` over a dict. It seems reasonable to
expect the same from the `map()` method.
According to the
[documentation](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html#transforming-expanding-data)
the data should be iterable (and dicts are) and according to the
[code](https://github.com/apache/airflow/blob/2.7.2/airflow/models/xcom_arg.py#L450)
dict is a valid type for `_MappedResult` value.
### How to reproduce
```python
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
def dummy(arg):
return arg
with DAG(
"test",
start_date=datetime(2021, 5, 1),
catchup=False,
) as dag:
@task
def get_dict():
return {
"key1": ("item1", "item2"),
"key2": ("item3", "item4"),
}
@task
def get_list():
return ["item1", "item2"]
@task
def print_msg(arg):
print(arg)
# Works with list
#print_msg.expand(arg=get_list().map(dummy))
# Does not work with dict
print_msg.expand(arg=get_dict().map(dummy))
if __name__ == "__main__":
dag.test()
### Operating System
Ubuntu 23.04
### Versions of Apache Airflow Providers
_No response_
### Deployment
Virtualenv installation
### Deployment details
_No response_
### Anything else
It is possible to workaround the issue with the following (ugly) patch that
will probably break something else somewhere...:
```diff
diff --git a/airflow/models/xcom_arg.py b/airflow/models/xcom_arg.py
index a19aa6703f..c87d522771 100644
--- a/airflow/models/xcom_arg.py
+++ b/airflow/models/xcom_arg.py
@@ -452,7 +452,10 @@ class _MapResult(Sequence):
self.callables = callables
def __getitem__(self, index: Any) -> Any:
- value = self.value[index]
+ if isinstance(self.value, dict):
+ value = list(self.value.items())[index]
+ else:
+ value = self.value[index]
# In the worker, we can access all actual callables. Call them.
callables = [f for f in self.callables if callable(f)]
```
Running the same DAG with this patch gives the following trace:
```
[2023-11-02T00:14:34.731+0100] {dag.py:3966} INFO - dagrun id: test
[2023-11-02T00:14:34.740+0100] {dag.py:3982} INFO - created dagrun <DagRun
test @ 2023-11-01T23:14:34.665846+00:00:
manual__2023-11-01T23:14:34.665846+00:00, state:running, queued_at: None.
externally triggered: False>
[2023-11-02T00:14:34.744+0100] {dag.py:3930} INFO -
*****************************************************
[2023-11-02T00:14:34.744+0100] {dag.py:3934} INFO - Running task get_dict
[2023-11-02 00:14:34,799] {taskinstance.py:1662} INFO - Exporting env vars:
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test'
AIRFLOW_CTX_TASK_ID='get_dict'
AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T23:14:34.665846+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T23:14:34.665846+00:00'
[2023-11-02T00:14:34.799+0100] {taskinstance.py:1662} INFO - Exporting env
vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test'
AIRFLOW_CTX_TASK_ID='get_dict'
AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T23:14:34.665846+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T23:14:34.665846+00:00'
[2023-11-02 00:14:34,801] {python.py:194} INFO - Done. Returned value was:
{'key1': ('item1', 'item2'), 'key2': ('item3', 'item4')}
[2023-11-02T00:14:34.801+0100] {python.py:194} INFO - Done. Returned value
was: {'key1': ('item1', 'item2'), 'key2': ('item3', 'item4')}
[2023-11-02 00:14:34,820] {taskinstance.py:1400} INFO - Marking task as
SUCCESS. dag_id=test, task_id=get_dict, execution_date=20231101T231434,
start_date=, end_date=20231101T231434
[2023-11-02T00:14:34.820+0100] {taskinstance.py:1400} INFO - Marking task as
SUCCESS. dag_id=test, task_id=get_dict, execution_date=20231101T231434,
start_date=, end_date=20231101T231434
[2023-11-02T00:14:34.837+0100] {dag.py:3938} INFO - get_dict ran
successfully!
[2023-11-02T00:14:34.838+0100] {dag.py:3941} INFO -
*****************************************************
[2023-11-02T00:14:34.852+0100] {dag.py:3930} INFO -
*****************************************************
[2023-11-02T00:14:34.852+0100] {dag.py:3934} INFO - Running task print_msg
[2023-11-02 00:14:34,895] {taskinstance.py:1662} INFO - Exporting env vars:
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test'
AIRFLOW_CTX_TASK_ID='print_msg'
AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T23:14:34.665846+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T23:14:34.665846+00:00'
[2023-11-02T00:14:34.895+0100] {taskinstance.py:1662} INFO - Exporting env
vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test'
AIRFLOW_CTX_TASK_ID='print_msg'
AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T23:14:34.665846+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T23:14:34.665846+00:00'
('key1', ['item1', 'item2'])
[2023-11-02 00:14:34,896] {python.py:194} INFO - Done. Returned value was:
None
[2023-11-02T00:14:34.896+0100] {python.py:194} INFO - Done. Returned value
was: None
[2023-11-02 00:14:34,897] {taskinstance.py:1400} INFO - Marking task as
SUCCESS. dag_id=test, task_id=print_msg, map_index=0,
execution_date=20231101T231434, start_date=, end_date=20231101T231434
[2023-11-02T00:14:34.897+0100] {taskinstance.py:1400} INFO - Marking task as
SUCCESS. dag_id=test, task_id=print_msg, map_index=0,
execution_date=20231101T231434, start_date=, end_date=20231101T231434
[2023-11-02T00:14:34.912+0100] {dag.py:3938} INFO - print_msg ran
successfully!
[2023-11-02T00:14:34.913+0100] {dag.py:3941} INFO -
*****************************************************
[2023-11-02T00:14:34.913+0100] {dag.py:3930} INFO -
*****************************************************
[2023-11-02T00:14:34.913+0100] {dag.py:3932} INFO - Running task print_msg
index 1
[2023-11-02 00:14:34,956] {taskinstance.py:1662} INFO - Exporting env vars:
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test'
AIRFLOW_CTX_TASK_ID='print_msg'
AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T23:14:34.665846+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T23:14:34.665846+00:00'
[2023-11-02T00:14:34.956+0100] {taskinstance.py:1662} INFO - Exporting env
vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test'
AIRFLOW_CTX_TASK_ID='print_msg'
AIRFLOW_CTX_EXECUTION_DATE='2023-11-01T23:14:34.665846+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-11-01T23:14:34.665846+00:00'
('key2', ['item3', 'item4'])
[2023-11-02 00:14:34,956] {python.py:194} INFO - Done. Returned value was:
None
[2023-11-02T00:14:34.956+0100] {python.py:194} INFO - Done. Returned value
was: None
[2023-11-02 00:14:34,957] {taskinstance.py:1400} INFO - Marking task as
SUCCESS. dag_id=test, task_id=print_msg, map_index=1,
execution_date=20231101T231434, start_date=, end_date=20231101T231434
[2023-11-02T00:14:34.957+0100] {taskinstance.py:1400} INFO - Marking task as
SUCCESS. dag_id=test, task_id=print_msg, map_index=1,
execution_date=20231101T231434, start_date=, end_date=20231101T231434
[2023-11-02T00:14:34.971+0100] {dag.py:3938} INFO - print_msg ran
successfully!
[2023-11-02T00:14:34.971+0100] {dag.py:3941} INFO -
*****************************************************
[2023-11-02T00:14:34.974+0100] {dagrun.py:653} INFO - Marking run <DagRun
test @ 2023-11-01T23:14:34.665846+00:00:
manual__2023-11-01T23:14:34.665846+00:00, state:running, queued_at: None.
externally triggered: False> successful
[2023-11-02T00:14:34.974+0100] {dagrun.py:704} INFO - DagRun Finished:
dag_id=test, execution_date=2023-11-01T23:14:34.665846+00:00,
run_id=manual__2023-11-01T23:14:34.665846+00:00, run_start_date=2023-11-01
23:14:34.665846+00:00, run_end_date=2023-11-01 23:14:34.974797+00:00,
run_duration=0.308951, state=success, external_trigger=False, run_type=manual,
data_interval_start=2023-10-31T23:14:34.665846+00:00,
data_interval_end=2023-11-01T23:14:34.665846+00:00, dag_hash=None
```
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]