smalsale opened a new issue, #32700:
URL: https://github.com/apache/airflow/issues/32700

   ### Apache Airflow version
   
   2.6.3
   
   ### What happened
   
   I am seeing the following error in the latest distribution of Airflow using 
dynamic task mapping. I have checked the input data  multiple times and it is 
never None.
   The error is as followed:
   [2023-06-21, 03:57:22 UTC] {{taskinstance.py:1824}} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 1407, in _run_raw_task
       self._execute_task_with_callbacks(context, test_mode)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 1531, in _execute_task_with_callbacks
       task_orig = self.render_templates(context=context)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 2179, in render_templates
       original_task.render_template_fields(context)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/mappedoperator.py",
 line 691, in render_template_fields
       mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/decorators/base.py", 
line 512, in _expand_mapped_kwargs
       op_kwargs, resolved_oids = super()._expand_mapped_kwargs(context, 
session)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/mappedoperator.py",
 line 559, in _expand_mapped_kwargs
       return self._get_specified_expand_input().resolve(context, session)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/expandinput.py",
 line 200, in resolve
       data = {k: self._expand_mapped_field(k, v, context, session=session) for 
k, v in self.value.items()}
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/expandinput.py",
 line 200, in <dictcomp>
       data = {k: self._expand_mapped_field(k, v, context, session=session) for 
k, v in self.value.items()}
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/expandinput.py",
 line 186, in _expand_mapped_field
       raise TypeError(f"can't map over value of type {type(value)}")
   TypeError: can't map over value of type <class 'NoneType'>
   The code looks like following and we are executing 300 tasks in parallel and 
there are usually
   @dag(schedule_interval=None,
        dag_id="consumer",
        start_date=datetime(2022, 11, 11),
        dagrun_timeout=timedelta(minutes=45),
        catchup=False,
        max_active_runs=3)
   def consumer():
   
       @task(
           task_id="validate",
           retries=1,
           retry_delay=timedelta(seconds=RETRY_DELAY_SEC),
           pool=FLIGHT_POOL,
           weight_rule=WeightRule.UPSTREAM
       )
       def validate(**kwargs) -> List[Dict]:
           payload: dict = {key: value for key, value in 
list(kwargs['dag_run'].conf.items())}
   
           # payload is a list of dicts
           # [ {"flight_id": 123},
           #   {"flight_id": 456}
           # ]
           if not payload or all(isinstance(x, dict) for x in payload):
               raise AirflowFailException(f"Bad Config Submitted: 
{kwargs['dag_run'].conf}")
   
           return payload["flights"]
   
       @task(
           task_id="calculate",
           retries=1,
           retry_delay=timedelta(seconds=30),
           pool=FLIGHT_POOL,
           weight_rule=WeightRule.UPSTREAM,
           show_return_value_in_logs=False
       )
       def calculate(flights: Dict, **kwargs):
           return calculate([build(flights["flight_id"])])
   
       @task(
           task_id="merge",
           retries=2,
           retry_delay=timedelta(seconds=30),
           trigger_rule=TriggerRule.ALL_DONE,
           pool=FLIGHT_POOL,
           weight_rule=WeightRule.UPSTREAM
       )
       def merge(records, **kwargs):
           upsert_records(records, SNOWFLAKE_HOOK)
   
       merge(records=calculate.expand(flights=validate()))
   
   ### What you think should happen instead
   
   We should not get an error since non of the map entries is NoneType
   
   ### How to reproduce
   
   This issue happens intermittently when 300 task are scheduled!
   
   ### Operating System
   
   airflow:2.6.1-python3.9
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to