SamWheating opened a new issue #22160:
URL: https://github.com/apache/airflow/issues/22160
### Apache Airflow version
2.2.2
### What happened
We had an entire Airflow environment crash recently due to a very rare
series of events.
1) A user manually submitted a task to the executor using the `run` button
with `ignore_all_deps`
2) While this task was running, the scheduler crashed
3) A new scheduler attempted to adopt this task and entered a crash loop
with the following stack trace:
```
[2022-03-10 17:10:20,386] {scheduler_job.py:655} INFO - Exited execute loop
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.9/site-packages/airflow/__main__.py", line
48, in main
args.func(args)
File "/usr/local/lib/python3.9/site-packages/airflow/cli/cli_parser.py",
line 48, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/cli.py", line
92, in wrapper
return f(*args, **kwargs)
File
"/usr/local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py",
line 75, in scheduler
_run_scheduler_job(args=args)
File
"/usr/local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py",
line 46, in _run_scheduler_job
job.run()
File "/usr/local/lib/python3.9/site-packages/airflow/jobs/base_job.py",
line 245, in run
self._execute()
File
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line
628, in _execute
self._run_scheduler_loop()
File
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line
681, in _run_scheduler_loop
self.adopt_or_reset_orphaned_tasks()
File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py",
line 70, in wrapper
return func(*args, session=session, **kwargs)
File
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line
1117, in adopt_or_reset_orphaned_tasks
for attempt in run_with_db_retries(logger=self.log):
File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line
382, in __iter__
do = self.iter(retry_state=retry_state)
File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line
349, in iter
return fut.result()
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 439, in
result
return self.__get_result()
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in
__get_result
raise self._exception
File
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line
1162, in adopt_or_reset_orphaned_tasks
to_reset = self.executor.try_adopt_task_instances(tis_to_reset_or_adopt)
File
"/usr/local/lib/python3.9/site-packages/airflow/executors/celery_executor.py",
line 485, in try_adopt_task_instances
self.adopted_task_timeouts[ti.key] = ti.queued_dttm +
self.task_adoption_timeout
TypeError: unsupported operand type(s) for +: 'NoneType' and
'datetime.timedelta
```
It turns out that submitting a task directly to the executor can create
TaskInstances in the `Running` state with `queued_dttm=None`, which then
crashes during the task adoption process.
Here's the place where the unhandled exception originates:
https://github.com/apache/airflow/blob/9e6769206e124b65d31028a3b7b9047d51fd0be5/airflow/executors/celery_executor.py#L546
And here's where the task is submitted to the executor without updating the
`queued_dttm`:
https://github.com/apache/airflow/blob/3b9ae4211b379f0ddee4ba3034c9b8e8c2f10707/airflow/www/views.py#L1738-L1743
### What you expected to happen
A new executor should be able to adopt running task instances, regardless of
how they are started.
### How to reproduce
1) Run the following DAG while using the CeleryExecutor:
```python
from datetime import timedelta
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow import utils
dag = DAG(
'parallel-dag-example',
start_date=utils.dates.days_ago(1),
max_active_runs=1,
dagrun_timeout=timedelta(minutes=60),
schedule_interval=None,
max_active_tasks=2,
concurrency=1
)
for i in range(5):
leaf = BashOperator(
task_id=f'task_{i}',
bash_command='sleep 300',
retries=0,
dag=dag,
)
```
2) Submit one of the `scheduled` tasks to the executor via the UI (you may
need to `ignore_all_deps`)
3) Restart the scheduler(s)
This will cause a crashloop of the schedulers until the task completes.
### Operating System
Debian GNU/Linux 10 (buster)
### Versions of Apache Airflow Providers
_No response_
### Deployment
Other 3rd-party Helm chart
### Deployment details
We're using the Celery Executor
### Anything else
I am currently evaluating a few possible solutions, among them:
1) Updating the line in the celery executor to be:
```python
self.adopted_task_timeouts[ti.key] = (ti.queued_dttm or ti.start_date) +
self.task_adoption_timeout
```
2) Only setting the `adopted_task_timeout` if the task is in the pending
state (since adoption timeouts are irrelevant for running tasks anyways):
```python
if state == celery_states.PENDING:
self.adopted_task_timeouts[ti.key] = (ti.queued_dttm or ti.start_date) +
self.task_adoption_timeout
```
3) Updating the taskinstance `queued_dttm` from the webserver when the task
is submitted to the queue (which feels like the most appropriate fix as it
actually fixes the root cause of the issue).
Let me know if you have any strong opinions on any of these proposed fixes.
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]