Gollum999 opened a new issue, #25698:
URL: https://github.com/apache/airflow/issues/25698
### Apache Airflow version
2.3.3
### What happened
I was backfilling some DAGs that use dynamic tasks when I got an exception
like the following:
```
Traceback (most recent call last):
File "/opt/conda/envs/production/bin/airflow", line 11, in <module>
sys.exit(main())
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/__main__.py",
line 38, in main
args.func(args)
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/cli/cli_parser.py",
line 51, in command
return func(*args, **kwargs)
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py",
line 99, in wrapper
return f(*args, **kwargs)
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/cli/commands/dag_command.py",
line 107, in dag_backfill
dag.run(
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dag.py",
line 2288, in run
job.run()
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/base_job.py",
line 244, in run
self._execute()
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py",
line 71, in wrapper
return func(*args, session=session, **kwargs)
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py",
line 847, in _execute
self._execute_dagruns(
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py",
line 68, in wrapper
return func(*args, **kwargs)
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py",
line 737, in _execute_dagruns
processed_dag_run_dates = self._process_backfill_task_instances(
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py",
line 68, in wrapper
return func(*args, **kwargs)
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py",
line 612, in _process_backfill_task_instances
for node, run_id, new_mapped_tis, max_map_index in
self._manage_executor_state(
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py",
line 270, in _manage_executor_state
new_tis, num_mapped_tis = node.expand_mapped_task(ti.run_id,
session=session)
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/mappedoperator.py",
line 614, in expand_mapped_task
operator.mul, self._resolve_map_lengths(run_id, session=session).values()
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/mappedoperator.py",
line 600, in _resolve_map_lengths
raise RuntimeError(f"Failed to populate all mapping metadata; missing:
{keys}")
RuntimeError: Failed to populate all mapping metadata; missing: 'x'
```
Digging further, it appears this always happens if the task used as input to
an `.expand` raises an Exception. Airflow doesn't handle this exception
gracefully like it does with exceptions in "normal" tasks, which can lead to
other errors from deeper within Airflow. This also means that since this is
not a "typical" failure case, things like `--rerun-failed-tasks` do not work as
expected.
### What you think should happen instead
Airflow should fail gracefully if exceptions are raised in dynamic task
generators.
### How to reproduce
```
#!/usr/bin/env python3
import datetime
import logging
from airflow.decorators import dag, task
logger = logging.getLogger(__name__)
@dag(
schedule_interval='@daily',
start_date=datetime.datetime(2022, 8, 12),
default_args={
'retries': 5,
'retry_delay': 5.0,
},
)
def test_backfill():
@task
def get_tasks(ti=None):
logger.info(f'{ti.try_number=}')
if ti.try_number < 3:
raise RuntimeError('')
return ['a', 'b', 'c']
@task
def do_stuff(x=None, ti=None):
logger.info(f'do_stuff: {x=}, {ti.try_number=}')
if ti.try_number < 3:
raise RuntimeError('')
do_stuff.expand(x=do_stuff.expand(x=get_tasks()))
do_stuff() >> do_stuff()
dag = test_backfill()
if __name__ == '__main__':
dag.cli()
```
```
airflow dags backfill test_backfill -s 2022-08-05 -e 2022-08-07
--rerun-failed-tasks
```
### Operating System
CentOS Stream 8
### Versions of Apache Airflow Providers
None
### Deployment
Other
### Deployment details
Standalone
### Anything else
I was able to reproduce this both with SQLite + `SequentialExecutor` as well
as with Postgres + `LocalExecutor`.
I haven't yet been able to reproduce this outside of `backfill` mode.
Possibly related since they mention the same exception text:
* #23533
* #23642
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]