xuganyu96 opened a new issue, #32706:
URL: https://github.com/apache/airflow/issues/32706

   ### Apache Airflow version
   
   2.6.3
   
   ### What happened
   
   # Context
   When the backend database shuts down (for maintenance, for example), Airflow 
scheduler's main scheduler loop crashes, but the scheduler process does not 
exit. In my company's setup, the scheduler process is monitored by 
`supervisord`, but since the scheduler process does not exit, `supervisord` did 
not pick up on the scheduler failure, causing prolonged scheduler outage.
   
   # Root cause
   In the `airflow/cli/commands/scheduler_command.py`, the main function call 
of the `airflow scheduler` command is the `_run_scheduler_job` function. When 
the `_run_scheduler_job` function is called, depending on the configuration, 
two sub-processes `serve_logs` and/or `health_check` may be started. The life 
cycle of these two sub-processes are managed by a context manager, so that when 
the context exits, the two sub-processes are terminated by the context managers:
   
   ```python
   def _run_scheduler_job(job_runner: SchedulerJobRunner, *, skip_serve_logs: 
bool) -> None:
       InternalApiConfig.force_database_direct_access()
       enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
       with _serve_logs(skip_serve_logs), 
_serve_health_check(enable_health_check):
           run_job(job=job_runner.job, execute_callable=job_runner._execute)
   
   @contextmanager
   def _serve_logs(skip_serve_logs: bool = False):
       """Starts serve_logs sub-process."""
       from airflow.utils.serve_logs import serve_logs
   
       sub_proc = None
       executor_class, _ = ExecutorLoader.import_default_executor_cls()
       if executor_class.serve_logs:
           if skip_serve_logs is False:
               sub_proc = Process(target=serve_logs)
               sub_proc.start()
       yield
       if sub_proc:
           sub_proc.terminate()
   
   
   @contextmanager
   def _serve_health_check(enable_health_check: bool = False):
       """Starts serve_health_check sub-process."""
       sub_proc = None
       if enable_health_check:
           sub_proc = Process(target=serve_health_check)
           sub_proc.start()
       yield
       if sub_proc:
           sub_proc.terminate()
   ```
   
   The mis-behavior happens when `run_job` raises unhandled exception. The 
exception takes over the control flow, and the context managers will not 
properly exit. When the main Python process tries to exit, the 
`multiprocessing` module tries to terminate all child processes 
(https://github.com/python/cpython/blob/1e1f4e91a905bab3103250a3ceadac0693b926d9/Lib/multiprocessing/util.py#L320C43-L320C43)
 by first calling `join()`. Because the sub-processes `serve_logs` and/or 
`health_check` are never terminated, calling `join()` on them will hang 
indefinitely, thus causing the zombie state.
   
   Note that this behavior was introduced since 2.5.0 (2.4.3 does not have this 
issue) when the two sub-processes are not managed with context manager, and the 
scheduler job is placed inside a try-catch-finally block.
   
   ### What you think should happen instead
   
   The scheduler process should never hang. If something went wrong, such as a 
database disconnect, the scheduler should simply crash, and let whoever manages 
the scheduler process handle respawn.
   
   As to how this should be achieved, I think the best way is to place 
`run_job` inside a try-catch block so that any exception can be caught and 
gracefully handled, although I am open to feedback.
   
   ### How to reproduce
   
   # To reproduce the scheduler zombie state
   Start an Airflow cluster with breeze:
   
   ```
   breeze start-airflow --python 3.9 --backend postgres [with any version at or 
later than 2.5.0]
   ```
   
   After the command opens the `tmux` windows, stop the `postgres` container 
`docker stop docker-compose-postgres-1`
   
   The webserver will not do anything. The triggerer should correctly crash and 
exit. The scheduler will crash but not exit.
   
   # To reproduce the context manager's failure to exit
   ```python
   from multiprocessing import Process
   from contextlib import contextmanager
   import time
   
   def busybox():
       time.sleep(24 * 3600)  # the entire day
   
   @contextmanager
   def some_resource():
       subproc = Process(target=busybox)
       subproc.start()
       print(f"Sub-process {subproc} started")
       yield
       subproc.terminate()
       subproc.join()
       print(f"Sub-process {subproc} terminated")
   
   def main():
       with some_resource():
           raise Exception("Oops")
   
   
   if __name__ == "__main__":
       main()
   ```
   
   ### Operating System
   
   MacOS with Docker Desktop
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to