Craig-Chatfield opened a new issue, #24027:
URL: https://github.com/apache/airflow/issues/24027
### Apache Airflow version
2.3.1 (latest released)
### What happened
If you set
- `'depends_on_past'=True`
on a task that utilises dynamic task mapping and then run that task the
scheduler shut downs, logging the following error:
```
AttributeError: 'MappedOperator' object has no attribute
'ignore_first_depends_on_past'
[40] [INFO] Shutting down: Master
```
The scheduler then restarts, logs the same error, restarts again getting
stuck in this loop, restarting indefinitely and never actually executing the
task.
For the same DAG, if you set:
- `'depends_on_past'=False`
Then it will execute correctly.
This issue occurs on Airflow versions: 2.3.0-5 and 2.3.1
### What you think should happen instead
Airflow should check the status of all tasks in the previous task mapping
collection and then decide whether the current task mapping should be executed
or not.
### How to reproduce
The following code should act as the minimal reproducible case.
- Task: `first` will execute correctly.
- Task: `second` causes the scheduler to restart
```
from datetime import datetime, timedelta
from airflow import models
from airflow.models.baseoperator import chain
from airflow.decorators import task
default_args = {
'owner': 'airflow',
'start_date': datetime(2022, 4, 25),
'depends_on_past': True,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
dag_name = 'break-scheduler'
@task
def producer():
return[1, 2]
@task
def consumer(arg):
print(arg)
with models.DAG(
dag_name,
default_args=default_args,
schedule_interval='0 * * * *',
catchup=True,
max_active_runs=1,
) as dag:
first = producer()
second = consumer.expand(arg=first)
chain(first, second)
```
### Operating System
docker/debian
### Versions of Apache Airflow Providers
_No response_
### Deployment
Astronomer
### Deployment details
_No response_
### Anything else
Full stack trace:
```
[2022-05-30 10:55:37,335] {scheduler_job.py:756} ERROR - Exception when
executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line
739, in _execute
self._run_scheduler_loop()
File
"/usr/local/lib/python3.9/site-packages/astronomer/airflow/version_check/plugin.py",
line 29, in run_before
fn(*args, **kwargs)
File
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line
827, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line
909, in _do_scheduling
callback_to_run = self._schedule_dag_run(dag_run, session)
File
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line
1151, in _schedule_dag_run
schedulable_tis, callback_to_run = dag_run.update_state(session=session,
execute_callbacks=False)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py",
line 68, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py",
line 522, in update_state
info = self.task_instance_scheduling_decisions(session)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py",
line 68, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py",
line 658, in task_instance_scheduling_decisions
schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py",
line 705, in _get_ready_tis
if not schedulable.are_dependencies_met(session=session,
dep_context=dep_context):
File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py",
line 68, in wrapper
return func(*args, **kwargs)
File
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line
1147, in are_dependencies_met
for dep_status in self.get_failed_dep_statuses(dep_context=dep_context,
session=session):
File
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line
1168, in get_failed_dep_statuses
for dep_status in dep.get_dep_statuses(self, session, dep_context):
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 8, in <module>
[2022-05-30 10:55:07 +0000] [40] [INFO] Handling signal: term
sys.exit(main())
File "/usr/local/lib/python3.9/site-packages/airflow/__main__.py", line
38, in main
args.func(args)
File "/usr/local/lib/python3.9/site-packages/airflow/cli/cli_parser.py",
line 51, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/cli.py", line
99, in wrapper
[2022-05-30 10:55:07 +0000] [42] [INFO] Worker exiting (pid: 42)
return f(*args, **kwargs)
File
"/usr/local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py",
line 75, in scheduler
_run_scheduler_job(args=args)
File
"/usr/local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py",
line 46, in _run_scheduler_job
job.run()
[2022-05-30 10:55:07 +0000] [44] [INFO] Worker exiting (pid: 44)
File "/usr/local/lib/python3.9/site-packages/airflow/jobs/base_job.py",
line 244, in run
self._execute()
File
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line
739, in _execute
self._run_scheduler_loop()
File
"/usr/local/lib/python3.9/site-packages/astronomer/airflow/version_check/plugin.py",
line 29, in run_before
fn(*args, **kwargs)
File
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line
827, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line
909, in _do_scheduling
callback_to_run = self._schedule_dag_run(dag_run, session)
File
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line
1151, in _schedule_dag_run
schedulable_tis, callback_to_run = dag_run.update_state(session=session,
execute_callbacks=False)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py",
line 68, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py",
line 522, in update_state
info = self.task_instance_scheduling_decisions(session)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py",
line 68, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py",
line 658, in task_instance_scheduling_decisions
schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py",
line 705, in _get_ready_tis
if not schedulable.are_dependencies_met(session=session,
dep_context=dep_context):
File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py",
line 68, in wrapper
return func(*args, **kwargs)
File
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line
1147, in are_dependencies_met
for dep_status in self.get_failed_dep_statuses(dep_context=dep_context,
session=session):
File
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line
1168, in get_failed_dep_statuses
for dep_status in dep.get_dep_statuses(self, session, dep_context):
File
"/usr/local/lib/python3.9/site-packages/airflow/ti_deps/deps/base_ti_dep.py",
line 95, in get_dep_statuses
yield from self._get_dep_statuses(ti, session, dep_context)
File
"/usr/local/lib/python3.9/site-packages/airflow/ti_deps/deps/prev_dagrun_dep.py",
line 71, in _get_dep_statuses
if ti.task.ignore_first_depends_on_past:
AttributeError: 'MappedOperator' object has no attribute
'ignore_first_depends_on_past'
[2022-05-30 10:55:07 +0000] [40] [INFO] Shutting down: Master
```
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]