szeswee opened a new issue, #43214: URL: https://github.com/apache/airflow/issues/43214
### Apache Airflow version Other Airflow 2 version (please specify below) ### If "Other Airflow 2 version" selected, which one? - 2.9.2 - 2.10.4 ### What happened? The [Airflow docs](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html#automatically-skipping-zero-length-maps) state the following behaviour when encountering zero-length maps when using Dynamic Task Mapping: > If the input is empty (zero length), no new tasks will be created and the mapped task will be marked as `SKIPPED`. The abovementioned behaviour is indeed correctly observed when a mapped task is first executed as part of a new DAG run (i.e. `try_number = 1`). However, on subsequent tries (i.e. `try_number > 1`), the mapped task will instead throw the following exception: ``` [2024-10-21, 10:54:51 UTC] {taskinstance.py:2306} INFO - Starting attempt 2 of 2 [2024-10-21, 10:54:51 UTC] {taskinstance.py:2330} INFO - Executing <Mapped(PythonOperator): print_args> on 2024-10-21 10:54:27.291897+00:00 [2024-10-21, 10:54:51 UTC] {standard_task_runner.py:63} INFO - Started process 341 to run task [2024-10-21, 10:54:51 UTC] {standard_task_runner.py:90} INFO - Running: ['airflow', 'tasks', 'run', 'dtm_failure', 'print_args', 'manual__2024-10-21T10:54:27.291897+00:00', '--job-id', '13517', '--raw', '--subdir', 'DAGS_FOLDER/dtm_failure.py', '--cfg-path', '/tmp/tmpahwek3h8', '--map-index', '0'] [2024-10-21, 10:54:51 UTC] {standard_task_runner.py:91} INFO - Job 13517: Subtask print_args [2024-10-21, 10:54:51 UTC] {task_command.py:426} INFO - Running <TaskInstance: dtm_failure.print_args manual__2024-10-21T10:54:27.291897+00:00 map_index=0 [running]> on host 172.21.174.13 [2024-10-21, 10:54:52 UTC] {taskinstance.py:2905} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2479, in _run_raw_task self._execute_task_with_callbacks(context, test_mode, session=session) File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2633, in _execute_task_with_callbacks task_orig = self.render_templates(context=context, jinja_env=jinja_env) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 3094, in render_templates original_task.render_template_fields(context, jinja_env) File "/usr/local/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 829, in render_template_fields mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/mappedoperator.py", line 688, in _expand_mapped_kwargs return self._get_specified_expand_input().resolve(context, session) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/expandinput.py", line 202, in resolve data = {k: self._expand_mapped_field(k, v, context, session=session) for k, v in self.value.items()} ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/expandinput.py", line 202, in <dictcomp> data = {k: self._expand_mapped_field(k, v, context, session=session) for k, v in self.value.items()} ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/expandinput.py", line 182, in _expand_mapped_field found_index = _find_index_for_this_field(map_index) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/expandinput.py", line 176, in _find_index_for_this_field raise RuntimeError(f"cannot expand field mapped to length {mapped_length!r}") RuntimeError: cannot expand field mapped to length 0 [2024-10-21, 10:54:52 UTC] {taskinstance.py:2953} ERROR - Unable to unmap task to determine if we need to send an alert email [2024-10-21, 10:54:52 UTC] {taskinstance.py:1206} INFO - Marking task as FAILED. dag_id=dtm_failure, task_id=print_args, run_id=manual__2024-10-21T10:54:27.291897+00:00, map_index=0, execution_date=20241021T105427, start_date=20241021T105451, end_date=20241021T105452 [2024-10-21, 10:54:52 UTC] {standard_task_runner.py:110} ERROR - Failed to execute job 13517 for task print_args (cannot expand field mapped to length 0; 341) ``` To emphasise, this issue only occurs when a zero-length map is encountered on `try_number > 1`. When `try_number = 1` and there exists a zero-length map, this issue does not occur. ### What you think should happen instead? There shouldn't be an exception raised. Looking at the logs above, it seems to be a bug with the handling of mapped tasks as it relates to sending alerts to emails. ### How to reproduce 1. Create the following DAG with this code: ```python from airflow import DAG from airflow.operators.python import PythonOperator def generate_args() -> list[dict]: from airflow.operators.python import get_current_context context = get_current_context() is_first_try = context["ti"].try_number == 1 return [{"foo": f"bar_{idx}"} for idx in range(5)] if is_first_try else [] with DAG( "dtm_failure", description="Demonstrate DAG failure with zero-length mapped tasks on subsequent tries", schedule=None, ): task_generate_args = PythonOperator( task_id="generate_args", python_callable=generate_args, ) task_print_args = PythonOperator.partial( task_id="print_args", python_callable=lambda foo: print(f"Arg: {foo}"), ).expand(op_kwargs=task_generate_args.output) ``` 2. On your Airflow deployment, manually trigger a run of the `dtm_failure` DAG. 3. The first try will succeed and you should see the following: <img width="1452" alt="Screenshot 2024-10-21 at 7 13 53 PM" src="https://github.com/user-attachments/assets/1283732d-1f4b-4363-9f3c-ef13e65eabbd"> 4. Select `Clear > Clear existing tasks` to retry the entire DAG run. 5. The second try will fail and you should see the following: <img width="1442" alt="Screenshot 2024-10-21 at 7 14 28 PM" src="https://github.com/user-attachments/assets/10a2e99d-5d8b-4d1b-8a58-d2a43309a64e"> 6. Select the failed mapped task to view its logs. ### Operating System Debian 11 ### Versions of Apache Airflow Providers _No response_ ### Deployment Other Docker-based deployment ### Deployment details - **Python version:** 3.11.4 ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
