xuganyu96 opened a new issue, #32706:
URL: https://github.com/apache/airflow/issues/32706
### Apache Airflow version
2.6.3
### What happened
# Context
When the backend database shuts down (for maintenance, for example), Airflow
scheduler's main scheduler loop crashes, but the scheduler process does not
exit. In my company's setup, the scheduler process is monitored by
`supervisord`, but since the scheduler process does not exit, `supervisord` did
not pick up on the scheduler failure, causing prolonged scheduler outage.
# Root cause
In the `airflow/cli/commands/scheduler_command.py`, the main function call
of the `airflow scheduler` command is the `_run_scheduler_job` function. When
the `_run_scheduler_job` function is called, depending on the configuration,
two sub-processes `serve_logs` and/or `health_check` may be started. The life
cycle of these two sub-processes are managed by a context manager, so that when
the context exits, the two sub-processes are terminated by the context managers:
```python
def _run_scheduler_job(job_runner: SchedulerJobRunner, *, skip_serve_logs:
bool) -> None:
InternalApiConfig.force_database_direct_access()
enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
with _serve_logs(skip_serve_logs),
_serve_health_check(enable_health_check):
run_job(job=job_runner.job, execute_callable=job_runner._execute)
@contextmanager
def _serve_logs(skip_serve_logs: bool = False):
"""Starts serve_logs sub-process."""
from airflow.utils.serve_logs import serve_logs
sub_proc = None
executor_class, _ = ExecutorLoader.import_default_executor_cls()
if executor_class.serve_logs:
if skip_serve_logs is False:
sub_proc = Process(target=serve_logs)
sub_proc.start()
yield
if sub_proc:
sub_proc.terminate()
@contextmanager
def _serve_health_check(enable_health_check: bool = False):
"""Starts serve_health_check sub-process."""
sub_proc = None
if enable_health_check:
sub_proc = Process(target=serve_health_check)
sub_proc.start()
yield
if sub_proc:
sub_proc.terminate()
```
The mis-behavior happens when `run_job` raises unhandled exception. The
exception takes over the control flow, and the context managers will not
properly exit. When the main Python process tries to exit, the
`multiprocessing` module tries to terminate all child processes
(https://github.com/python/cpython/blob/1e1f4e91a905bab3103250a3ceadac0693b926d9/Lib/multiprocessing/util.py#L320C43-L320C43)
by first calling `join()`. Because the sub-processes `serve_logs` and/or
`health_check` are never terminated, calling `join()` on them will hang
indefinitely, thus causing the zombie state.
Note that this behavior was introduced since 2.5.0 (2.4.3 does not have this
issue) when the two sub-processes are not managed with context manager, and the
scheduler job is placed inside a try-catch-finally block.
### What you think should happen instead
The scheduler process should never hang. If something went wrong, such as a
database disconnect, the scheduler should simply crash, and let whoever manages
the scheduler process handle respawn.
As to how this should be achieved, I think the best way is to place
`run_job` inside a try-catch block so that any exception can be caught and
gracefully handled, although I am open to feedback.
### How to reproduce
# To reproduce the scheduler zombie state
Start an Airflow cluster with breeze:
```
breeze start-airflow --python 3.9 --backend postgres [with any version at or
later than 2.5.0]
```
After the command opens the `tmux` windows, stop the `postgres` container
`docker stop docker-compose-postgres-1`
The webserver will not do anything. The triggerer should correctly crash and
exit. The scheduler will crash but not exit.
# To reproduce the context manager's failure to exit
```python
from multiprocessing import Process
from contextlib import contextmanager
import time
def busybox():
time.sleep(24 * 3600) # the entire day
@contextmanager
def some_resource():
subproc = Process(target=busybox)
subproc.start()
print(f"Sub-process {subproc} started")
yield
subproc.terminate()
subproc.join()
print(f"Sub-process {subproc} terminated")
def main():
with some_resource():
raise Exception("Oops")
if __name__ == "__main__":
main()
```
### Operating System
MacOS with Docker Desktop
### Versions of Apache Airflow Providers
_No response_
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]