szeswee opened a new issue, #43214:
URL: https://github.com/apache/airflow/issues/43214

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.9.2
   
   ### What happened?
   
   The [Airflow 
docs](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html#automatically-skipping-zero-length-maps)
 state the following behaviour when encountering zero-length maps when using 
Dynamic Task Mapping:
   
   > If the input is empty (zero length), no new tasks will be created and the 
mapped task will be marked as `SKIPPED`.
   
   The abovementioned behaviour is indeed correctly observed when a mapped task 
is first executed as part of a new DAG run (i.e. `try_number = 1`).
   
   However, on subsequent tries (i.e. `try_number > 1`), the mapped task will 
instead throw the following exception:
   
   ```
   [2024-10-21, 10:54:51 UTC] {taskinstance.py:2306} INFO - Starting attempt 2 
of 2
   [2024-10-21, 10:54:51 UTC] {taskinstance.py:2330} INFO - Executing 
<Mapped(PythonOperator): print_args> on 2024-10-21 10:54:27.291897+00:00
   [2024-10-21, 10:54:51 UTC] {standard_task_runner.py:63} INFO - Started 
process 341 to run task
   [2024-10-21, 10:54:51 UTC] {standard_task_runner.py:90} INFO - Running: 
['airflow', 'tasks', 'run', 'dtm_failure', 'print_args', 
'manual__2024-10-21T10:54:27.291897+00:00', '--job-id', '13517', '--raw', 
'--subdir', 'DAGS_FOLDER/dtm_failure.py', '--cfg-path', '/tmp/tmpahwek3h8', 
'--map-index', '0']
   [2024-10-21, 10:54:51 UTC] {standard_task_runner.py:91} INFO - Job 13517: 
Subtask print_args
   [2024-10-21, 10:54:51 UTC] {task_command.py:426} INFO - Running 
<TaskInstance: dtm_failure.print_args manual__2024-10-21T10:54:27.291897+00:00 
map_index=0 [running]> on host 172.21.174.13
   [2024-10-21, 10:54:52 UTC] {taskinstance.py:2905} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 
2479, in _run_raw_task
       self._execute_task_with_callbacks(context, test_mode, session=session)
     File 
"/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 
2633, in _execute_task_with_callbacks
       task_orig = self.render_templates(context=context, jinja_env=jinja_env)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 
3094, in render_templates
       original_task.render_template_fields(context, jinja_env)
     File 
"/usr/local/lib/python3.11/site-packages/airflow/models/mappedoperator.py", 
line 829, in render_template_fields
       mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/airflow/models/mappedoperator.py", 
line 688, in _expand_mapped_kwargs
       return self._get_specified_expand_input().resolve(context, session)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/airflow/models/expandinput.py", line 
202, in resolve
       data = {k: self._expand_mapped_field(k, v, context, session=session) for 
k, v in self.value.items()}
              
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/airflow/models/expandinput.py", line 
202, in <dictcomp>
       data = {k: self._expand_mapped_field(k, v, context, session=session) for 
k, v in self.value.items()}
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/airflow/models/expandinput.py", line 
182, in _expand_mapped_field
       found_index = _find_index_for_this_field(map_index)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/airflow/models/expandinput.py", line 
176, in _find_index_for_this_field
       raise RuntimeError(f"cannot expand field mapped to length 
{mapped_length!r}")
   RuntimeError: cannot expand field mapped to length 0
   [2024-10-21, 10:54:52 UTC] {taskinstance.py:2953} ERROR - Unable to unmap 
task to determine if we need to send an alert email
   [2024-10-21, 10:54:52 UTC] {taskinstance.py:1206} INFO - Marking task as 
FAILED. dag_id=dtm_failure, task_id=print_args, 
run_id=manual__2024-10-21T10:54:27.291897+00:00, map_index=0, 
execution_date=20241021T105427, start_date=20241021T105451, 
end_date=20241021T105452
   [2024-10-21, 10:54:52 UTC] {standard_task_runner.py:110} ERROR - Failed to 
execute job 13517 for task print_args (cannot expand field mapped to length 0; 
341)
   ```
   
   ### What you think should happen instead?
   
   There shouldn't be an exception raised. Looking at the logs above, it seems 
to be a bug with the handling of mapped tasks as it relates to sending alerts 
to emails.
   
   ### How to reproduce
   
   1. Create the following DAG with this code:
      ```python
      from airflow import DAG
      from airflow.operators.python import PythonOperator
      
      
      def generate_args() -> list[dict]:
         from airflow.operators.python import get_current_context
      
         context = get_current_context()
         is_first_try = context["ti"].try_number == 1
      
         return [{"foo": f"bar_{idx}"} for idx in range(5)] if is_first_try 
else []
      
      
      with DAG(
         "dtm_failure",
         description="Demonstrate DAG failure with non-zero length mapped tasks 
on subsequent tries",
         schedule=None,
      ):
         task_generate_args = PythonOperator(
             task_id="generate_args",
             python_callable=generate_args,
         )
      
         task_print_args = PythonOperator.partial(
             task_id="print_args",
             python_callable=lambda foo: print(f"Arg: {foo}"),
         ).expand(op_kwargs=task_generate_args.output)
      ```
   2. On your Airflow deployment, manually trigger a run of the `dtm_failure` 
DAG.  
   3. The first try will succeed and you should see the following: <img 
width="1452" alt="Screenshot 2024-10-21 at 7 13 53 PM" 
src="https://github.com/user-attachments/assets/1283732d-1f4b-4363-9f3c-ef13e65eabbd";>
   4. Select `Clear > Clear existing tasks` to retry the entire DAG run.
   5. The second try will fail and you should see the following: <img 
width="1442" alt="Screenshot 2024-10-21 at 7 14 28 PM" 
src="https://github.com/user-attachments/assets/10a2e99d-5d8b-4d1b-8a58-d2a43309a64e";>
   6. Select the failed mapped task to view its logs.
   
   
   ### Operating System
   
   Debian 11
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   - **Python version:** 3.11.4
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to