Craig-Chatfield opened a new issue, #24027:
URL: https://github.com/apache/airflow/issues/24027

   ### Apache Airflow version
   
   2.3.1 (latest released)
   
   ### What happened
   
   If you set 
   - `'depends_on_past'=True`
   on a task that utilises dynamic task mapping and then run that task the 
scheduler shut downs, logging the following error:
   
   ```
   AttributeError: 'MappedOperator' object has no attribute 
'ignore_first_depends_on_past'
   [40] [INFO] Shutting down: Master
   ```
   The scheduler then restarts, logs the same error, restarts again getting 
stuck in this loop, restarting indefinitely and never actually executing the 
task.
   
   For the same DAG, if you set:
   - `'depends_on_past'=False`
   Then it will execute correctly. 
   
   This issue occurs on Airflow versions: 2.3.0-5 and 2.3.1
   
   ### What you think should happen instead
   
   Airflow should check the status of all tasks in the previous task mapping 
collection and then decide whether the current task mapping should be executed 
or not. 
   
   ### How to reproduce
   
   The following code should act as the minimal reproducible case. 
   
   - Task: `first` will execute correctly.
   - Task: `second` causes the scheduler to restart
   
   ```
   from datetime import datetime, timedelta
   
   from airflow import models
   from airflow.models.baseoperator import chain
   from airflow.decorators import task
   
   
   default_args = {
       'owner': 'airflow',
       'start_date': datetime(2022, 4, 25),
       'depends_on_past': True,
       'retries': 2,
       'retry_delay': timedelta(minutes=5),
   }
   
   dag_name = 'break-scheduler'
   
   @task
   def producer():
       return[1, 2]
   
   @task
   def consumer(arg):
       print(arg)
   
   with models.DAG(
       dag_name,
       default_args=default_args,
       schedule_interval='0 * * * *',
       catchup=True,
       max_active_runs=1,
   ) as dag:
       first = producer()
   
       second = consumer.expand(arg=first)
   
       chain(first, second)
   ```
   
   ### Operating System
   
   docker/debian
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   Full stack trace:
   
   ```
   [2022-05-30 10:55:37,335] {scheduler_job.py:756} ERROR - Exception when 
executing SchedulerJob._run_scheduler_loop
   
   Traceback (most recent call last):
   
     File 
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 
739, in _execute
   
       self._run_scheduler_loop()
   
     File 
"/usr/local/lib/python3.9/site-packages/astronomer/airflow/version_check/plugin.py",
 line 29, in run_before
   
       fn(*args, **kwargs)
   
     File 
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 
827, in _run_scheduler_loop
   
       num_queued_tis = self._do_scheduling(session)
   
     File 
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 
909, in _do_scheduling
   
       callback_to_run = self._schedule_dag_run(dag_run, session)
   
     File 
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 
1151, in _schedule_dag_run
   
       schedulable_tis, callback_to_run = dag_run.update_state(session=session, 
execute_callbacks=False)
   
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 68, in wrapper
   
       return func(*args, **kwargs)
   
     File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py", 
line 522, in update_state
   
       info = self.task_instance_scheduling_decisions(session)
   
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 68, in wrapper
   
       return func(*args, **kwargs)
   
     File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py", 
line 658, in task_instance_scheduling_decisions
   
       schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
   
     File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py", 
line 705, in _get_ready_tis
   
       if not schedulable.are_dependencies_met(session=session, 
dep_context=dep_context):
   
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 68, in wrapper
   
       return func(*args, **kwargs)
   
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 
1147, in are_dependencies_met
   
       for dep_status in self.get_failed_dep_statuses(dep_context=dep_context, 
session=session):
   
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 
1168, in get_failed_dep_statuses
   
       for dep_status in dep.get_dep_statuses(self, session, dep_context):
   
   Traceback (most recent call last):
   
     File "/usr/local/bin/airflow", line 8, in <module>
   
   [2022-05-30 10:55:07 +0000] [40] [INFO] Handling signal: term
   
       sys.exit(main())
   
     File "/usr/local/lib/python3.9/site-packages/airflow/__main__.py", line 
38, in main
   
       args.func(args)
   
     File "/usr/local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", 
line 51, in command
   
       return func(*args, **kwargs)
   
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/cli.py", line 
99, in wrapper
   
   [2022-05-30 10:55:07 +0000] [42] [INFO] Worker exiting (pid: 42)
   
       return f(*args, **kwargs)
   
     File 
"/usr/local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py",
 line 75, in scheduler
   
       _run_scheduler_job(args=args)
   
     File 
"/usr/local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py",
 line 46, in _run_scheduler_job
   
       job.run()
   
   [2022-05-30 10:55:07 +0000] [44] [INFO] Worker exiting (pid: 44)
   
     File "/usr/local/lib/python3.9/site-packages/airflow/jobs/base_job.py", 
line 244, in run
   
       self._execute()
   
     File 
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 
739, in _execute
   
       self._run_scheduler_loop()
   
     File 
"/usr/local/lib/python3.9/site-packages/astronomer/airflow/version_check/plugin.py",
 line 29, in run_before
   
       fn(*args, **kwargs)
   
     File 
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 
827, in _run_scheduler_loop
   
       num_queued_tis = self._do_scheduling(session)
   
     File 
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 
909, in _do_scheduling
   
       callback_to_run = self._schedule_dag_run(dag_run, session)
   
     File 
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 
1151, in _schedule_dag_run
   
       schedulable_tis, callback_to_run = dag_run.update_state(session=session, 
execute_callbacks=False)
   
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 68, in wrapper
   
       return func(*args, **kwargs)
   
     File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py", 
line 522, in update_state
   
       info = self.task_instance_scheduling_decisions(session)
   
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 68, in wrapper
   
       return func(*args, **kwargs)
   
     File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py", 
line 658, in task_instance_scheduling_decisions
   
       schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
   
     File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py", 
line 705, in _get_ready_tis
   
       if not schedulable.are_dependencies_met(session=session, 
dep_context=dep_context):
   
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 68, in wrapper
   
       return func(*args, **kwargs)
   
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 
1147, in are_dependencies_met
   
       for dep_status in self.get_failed_dep_statuses(dep_context=dep_context, 
session=session):
   
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 
1168, in get_failed_dep_statuses
   
       for dep_status in dep.get_dep_statuses(self, session, dep_context):
   
     File 
"/usr/local/lib/python3.9/site-packages/airflow/ti_deps/deps/base_ti_dep.py", 
line 95, in get_dep_statuses
   
       yield from self._get_dep_statuses(ti, session, dep_context)
   
     File 
"/usr/local/lib/python3.9/site-packages/airflow/ti_deps/deps/prev_dagrun_dep.py",
 line 71, in _get_dep_statuses
   
       if ti.task.ignore_first_depends_on_past:
   
   AttributeError: 'MappedOperator' object has no attribute 
'ignore_first_depends_on_past'
   
   [2022-05-30 10:55:07 +0000] [40] [INFO] Shutting down: Master
   ```
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to