SamWheating opened a new issue #22160:
URL: https://github.com/apache/airflow/issues/22160


   ### Apache Airflow version
   
   2.2.2
   
   ### What happened
   
   We had an entire Airflow environment crash recently due to a very rare 
series of events.
   
   1) A user manually submitted a task to the executor using the `run` button 
with `ignore_all_deps`
   2) While this task was running, the scheduler crashed
   3) A new scheduler attempted to adopt this task and entered a crash loop 
with the following stack trace:
   
   ```
   [2022-03-10 17:10:20,386] {scheduler_job.py:655} INFO - Exited execute loop
   Traceback (most recent call last):
     File "/usr/local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File "/usr/local/lib/python3.9/site-packages/airflow/__main__.py", line 
48, in main
       args.func(args)
     File "/usr/local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", 
line 48, in command
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/cli.py", line 
92, in wrapper
       return f(*args, **kwargs)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py",
 line 75, in scheduler
       _run_scheduler_job(args=args)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py",
 line 46, in _run_scheduler_job
       job.run()
     File "/usr/local/lib/python3.9/site-packages/airflow/jobs/base_job.py", 
line 245, in run
       self._execute()
     File 
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 
628, in _execute
       self._run_scheduler_loop()
     File 
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 
681, in _run_scheduler_loop
       self.adopt_or_reset_orphaned_tasks()
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 70, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 
1117, in adopt_or_reset_orphaned_tasks
       for attempt in run_with_db_retries(logger=self.log):
     File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line 
382, in __iter__
       do = self.iter(retry_state=retry_state)
     File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line 
349, in iter
       return fut.result()
     File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 439, in 
result
       return self.__get_result()
     File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in 
__get_result
       raise self._exception
     File 
"/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 
1162, in adopt_or_reset_orphaned_tasks
       to_reset = self.executor.try_adopt_task_instances(tis_to_reset_or_adopt)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/executors/celery_executor.py", 
line 485, in try_adopt_task_instances
       self.adopted_task_timeouts[ti.key] = ti.queued_dttm + 
self.task_adoption_timeout
   TypeError: unsupported operand type(s) for +: 'NoneType' and 
'datetime.timedelta
   ```
   
   It turns out that submitting a task directly to the executor can create 
TaskInstances in the `Running` state with `queued_dttm=None`, which then 
crashes during the task adoption process. 
   
   Here's the place where the unhandled exception originates:
   
https://github.com/apache/airflow/blob/9e6769206e124b65d31028a3b7b9047d51fd0be5/airflow/executors/celery_executor.py#L546
   
   And here's where the task is submitted to the executor without updating the 
`queued_dttm`:
   
https://github.com/apache/airflow/blob/3b9ae4211b379f0ddee4ba3034c9b8e8c2f10707/airflow/www/views.py#L1738-L1743
   
   ### What you expected to happen
   
   A new executor should be able to adopt running task instances, regardless of 
how they are started. 
   
   ### How to reproduce
   
   1) Run the following DAG while using the CeleryExecutor:
   
   ```python
   from datetime import timedelta
   from airflow.models import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow import utils
   
   dag = DAG(
       'parallel-dag-example',
       start_date=utils.dates.days_ago(1),
       max_active_runs=1,
       dagrun_timeout=timedelta(minutes=60),
       schedule_interval=None,
       max_active_tasks=2,
       concurrency=1
   )
   
   for i in range(5):
       leaf = BashOperator(
           task_id=f'task_{i}',
           bash_command='sleep 300',
           retries=0,
           dag=dag,
       )
   ```
   
   2) Submit one of the `scheduled` tasks to the executor via the UI (you may 
need to `ignore_all_deps`)
   3) Restart the scheduler(s)
   
   This will cause a crashloop of the schedulers until the task completes.
   
   
   ### Operating System
   
   Debian GNU/Linux 10 (buster)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other 3rd-party Helm chart
   
   ### Deployment details
   
   We're using the Celery Executor
   
   ### Anything else
   
   I am currently evaluating a few possible solutions, among them:
   
   1) Updating the line in the celery executor to be:
   ```python
   self.adopted_task_timeouts[ti.key] = (ti.queued_dttm or ti.start_date) + 
self.task_adoption_timeout 
   ```
   
   2) Only setting the `adopted_task_timeout` if the task is in the pending 
state (since adoption timeouts are irrelevant for running tasks anyways):
   ```python
   if state == celery_states.PENDING:
       self.adopted_task_timeouts[ti.key] = (ti.queued_dttm or ti.start_date) + 
self.task_adoption_timeout 
   ```
   
   3) Updating the taskinstance `queued_dttm` from the webserver when the task 
is submitted to the queue (which feels like the most appropriate fix as it 
actually fixes the root cause of the issue).
   
   Let me know if you have any strong opinions on any of these proposed fixes. 
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to