smalsale opened a new issue, #32700:
URL: https://github.com/apache/airflow/issues/32700
### Apache Airflow version
2.6.3
### What happened
I am seeing the following error in the latest distribution of Airflow using
dynamic task mapping. I have checked the input data multiple times and it is
never None.
The error is as followed:
[2023-06-21, 03:57:22 UTC] {{taskinstance.py:1824}} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
line 1407, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
line 1531, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
line 2179, in render_templates
original_task.render_template_fields(context)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/mappedoperator.py",
line 691, in render_template_fields
mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context, session)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/decorators/base.py",
line 512, in _expand_mapped_kwargs
op_kwargs, resolved_oids = super()._expand_mapped_kwargs(context,
session)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/mappedoperator.py",
line 559, in _expand_mapped_kwargs
return self._get_specified_expand_input().resolve(context, session)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/expandinput.py",
line 200, in resolve
data = {k: self._expand_mapped_field(k, v, context, session=session) for
k, v in self.value.items()}
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/expandinput.py",
line 200, in <dictcomp>
data = {k: self._expand_mapped_field(k, v, context, session=session) for
k, v in self.value.items()}
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/expandinput.py",
line 186, in _expand_mapped_field
raise TypeError(f"can't map over value of type {type(value)}")
TypeError: can't map over value of type <class 'NoneType'>
The code looks like following and we are executing 300 tasks in parallel and
there are usually
@dag(schedule_interval=None,
dag_id="consumer",
start_date=datetime(2022, 11, 11),
dagrun_timeout=timedelta(minutes=45),
catchup=False,
max_active_runs=3)
def consumer():
@task(
task_id="validate",
retries=1,
retry_delay=timedelta(seconds=RETRY_DELAY_SEC),
pool=FLIGHT_POOL,
weight_rule=WeightRule.UPSTREAM
)
def validate(**kwargs) -> List[Dict]:
payload: dict = {key: value for key, value in
list(kwargs['dag_run'].conf.items())}
# payload is a list of dicts
# [ {"flight_id": 123},
# {"flight_id": 456}
# ]
if not payload or all(isinstance(x, dict) for x in payload):
raise AirflowFailException(f"Bad Config Submitted:
{kwargs['dag_run'].conf}")
return payload["flights"]
@task(
task_id="calculate",
retries=1,
retry_delay=timedelta(seconds=30),
pool=FLIGHT_POOL,
weight_rule=WeightRule.UPSTREAM,
show_return_value_in_logs=False
)
def calculate(flights: Dict, **kwargs):
return calculate([build(flights["flight_id"])])
@task(
task_id="merge",
retries=2,
retry_delay=timedelta(seconds=30),
trigger_rule=TriggerRule.ALL_DONE,
pool=FLIGHT_POOL,
weight_rule=WeightRule.UPSTREAM
)
def merge(records, **kwargs):
upsert_records(records, SNOWFLAKE_HOOK)
merge(records=calculate.expand(flights=validate()))
### What you think should happen instead
We should not get an error since non of the map entries is NoneType
### How to reproduce
This issue happens intermittently when 300 task are scheduled!
### Operating System
airflow:2.6.1-python3.9
### Versions of Apache Airflow Providers
_No response_
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]