Gollum999 opened a new issue, #25698:
URL: https://github.com/apache/airflow/issues/25698

   ### Apache Airflow version
   
   2.3.3
   
   ### What happened
   
   I was backfilling some DAGs that use dynamic tasks when I got an exception 
like the following:
   
   ```
   Traceback (most recent call last):
     File "/opt/conda/envs/production/bin/airflow", line 11, in <module>
       sys.exit(main())
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/__main__.py", 
line 38, in main
       args.func(args)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/cli/cli_parser.py",
 line 51, in command
       return func(*args, **kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", 
line 99, in wrapper
       return f(*args, **kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/cli/commands/dag_command.py",
 line 107, in dag_backfill
       dag.run(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dag.py", 
line 2288, in run
       job.run()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/base_job.py",
 line 244, in run
       self._execute()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py",
 line 71, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py",
 line 847, in _execute
       self._execute_dagruns(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py",
 line 68, in wrapper
       return func(*args, **kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py",
 line 737, in _execute_dagruns
       processed_dag_run_dates = self._process_backfill_task_instances(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py",
 line 68, in wrapper
       return func(*args, **kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py",
 line 612, in _process_backfill_task_instances
       for node, run_id, new_mapped_tis, max_map_index in 
self._manage_executor_state(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py",
 line 270, in _manage_executor_state
       new_tis, num_mapped_tis = node.expand_mapped_task(ti.run_id, 
session=session)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/mappedoperator.py",
 line 614, in expand_mapped_task
       operator.mul, self._resolve_map_lengths(run_id, session=session).values()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/mappedoperator.py",
 line 600, in _resolve_map_lengths
       raise RuntimeError(f"Failed to populate all mapping metadata; missing: 
{keys}")
   RuntimeError: Failed to populate all mapping metadata; missing: 'x'
   ```
   
   Digging further, it appears this always happens if the task used as input to 
an `.expand` raises an Exception.  Airflow doesn't handle this exception 
gracefully like it does with exceptions in "normal" tasks, which can lead to 
other errors from deeper within Airflow.  This also means that since this is 
not a "typical" failure case, things like `--rerun-failed-tasks` do not work as 
expected.
   
   ### What you think should happen instead
   
   Airflow should fail gracefully if exceptions are raised in dynamic task 
generators.
   
   ### How to reproduce
   
   ```
   #!/usr/bin/env python3
   
   import datetime
   import logging
   
   from airflow.decorators import dag, task
   
   
   logger = logging.getLogger(__name__)
   
   
   @dag(
       schedule_interval='@daily',
       start_date=datetime.datetime(2022, 8, 12),
       default_args={
           'retries': 5,
           'retry_delay': 5.0,
       },
   )
   def test_backfill():
       @task
       def get_tasks(ti=None):
           logger.info(f'{ti.try_number=}')
           if ti.try_number < 3:
               raise RuntimeError('')
           return ['a', 'b', 'c']
   
       @task
       def do_stuff(x=None, ti=None):
           logger.info(f'do_stuff: {x=}, {ti.try_number=}')
           if ti.try_number < 3:
               raise RuntimeError('')
   
       do_stuff.expand(x=do_stuff.expand(x=get_tasks()))
       do_stuff() >> do_stuff()
   
   
   dag = test_backfill()
   
   
   if __name__ == '__main__':
       dag.cli()
   ```
   ```
   airflow dags backfill test_backfill -s 2022-08-05 -e 2022-08-07 
--rerun-failed-tasks
   ```
   
   ### Operating System
   
   CentOS Stream 8
   
   ### Versions of Apache Airflow Providers
   
   None
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   Standalone
   
   ### Anything else
   
   I was able to reproduce this both with SQLite + `SequentialExecutor` as well 
as with Postgres + `LocalExecutor`.
   
   I haven't yet been able to reproduce this outside of `backfill` mode.
   
   Possibly related since they mention the same exception text:
   * #23533
   * #23642
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to