yuqian90 opened a new issue #15938: URL: https://github.com/apache/airflow/issues/15938
**Apache Airflow version**: 1.10.13 onwards (Any version that picked up #11278, including Airflow 2.0.* and 2.1.*) **Environment**: - **Cloud provider or hardware configuration**: Any - **OS** (e.g. from /etc/os-release): Only tested on Debian Linux, but others may be affected too - **Kernel** (e.g. `uname -a`): Any - **Install tools**: Any - **Others**: Only celery_executor is affected **What happened**: This was first reported [here](https://github.com/apache/airflow/issues/7935#issuecomment-839656436). airflow-scheduler sometimes stops heartbeating and stops scheduling any tasks with this last line in the log. This happen at random times, a few times a week. Happens more often if the scheduler machine is slow. ``` {scheduler_job.py:746} INFO - Exiting gracefully upon receiving signal 15 ``` Related to #7935 Most likely caused by #11278 **What you expected to happen**: Scheduler should not become stuck **How to reproduce it**: Here's a small reproducing example of the problem. There's roughly 1/25 chance it will be stuck. Run it many times to see it happen. ```python #!/usr/bin/env python3.8 import os import random import signal import time from multiprocessing import Pool def send_task_to_executor(arg): pass def _exit_gracefully(signum, frame): print(f"{os.getpid()} Exiting gracefully upon receiving signal {signum}") def register_signals(): print(f"{os.getpid()} register_signals()") signal.signal(signal.SIGINT, _exit_gracefully) signal.signal(signal.SIGTERM, _exit_gracefully) signal.signal(signal.SIGUSR2, _exit_gracefully) def reset_signals(): if random.randint(0, 500) == 0: # This sleep statement here simulates the machine being busy print(f"{os.getpid()} is slow") time.sleep(0.1) signal.signal(signal.SIGINT, signal.SIG_DFL) signal.signal(signal.SIGTERM, signal.SIG_DFL) signal.signal(signal.SIGUSR2, signal.SIG_DFL) if __name__ == "__main__": register_signals() task_tuples_to_send = list(range(20)) sync_parallelism = 15 chunksize = 5 with Pool(processes=sync_parallelism, initializer=reset_signals) as pool: pool.map( send_task_to_executor, task_tuples_to_send, chunksize=chunksize, ) ``` The reproducing example above can become stuck with a `py-spy dump` that looks exactly like what airflow scheduler does: `py-spy dump` for the parent `airflow scheduler` process ``` Python v3.8.7 Thread 0x7FB54794E740 (active): "MainThread" poll (multiprocessing/popen_fork.py:27) wait (multiprocessing/popen_fork.py:47) join (multiprocessing/process.py:149) _terminate_pool (multiprocessing/pool.py:729) __call__ (multiprocessing/util.py:224) terminate (multiprocessing/pool.py:654) __exit__ (multiprocessing/pool.py:736) _send_tasks_to_celery (airflow/executors/celery_executor.py:331) _process_tasks (airflow/executors/celery_executor.py:272) trigger_tasks (airflow/executors/celery_executor.py:263) heartbeat (airflow/executors/base_executor.py:158) _run_scheduler_loop (airflow/jobs/scheduler_job.py:1388) _execute (airflow/jobs/scheduler_job.py:1284) run (airflow/jobs/base_job.py:237) scheduler (airflow/cli/commands/scheduler_command.py:63) wrapper (airflow/utils/cli.py:89) command (airflow/cli/cli_parser.py:48) main (airflow/__main__.py:40) <module> (airflow:8) ``` `py-spy dump` for the child `airflow scheduler` process ``` Python v3.8.7 Thread 16232 (idle): "MainThread" __enter__ (multiprocessing/synchronize.py:95) get (multiprocessing/queues.py:355) worker (multiprocessing/pool.py:114) run (multiprocessing/process.py:108) _bootstrap (multiprocessing/process.py:315) _launch (multiprocessing/popen_fork.py:75) __init__ (multiprocessing/popen_fork.py:19) _Popen (multiprocessing/context.py:277) start (multiprocessing/process.py:121) _repopulate_pool_static (multiprocessing/pool.py:326) _repopulate_pool (multiprocessing/pool.py:303) __init__ (multiprocessing/pool.py:212) Pool (multiprocessing/context.py:119) _send_tasks_to_celery (airflow/executors/celery_executor.py:330) _process_tasks (airflow/executors/celery_executor.py:272) trigger_tasks (airflow/executors/celery_executor.py:263) heartbeat (airflow/executors/base_executor.py:158) _run_scheduler_loop (airflow/jobs/scheduler_job.py:1388) _execute (airflow/jobs/scheduler_job.py:1284) run (airflow/jobs/base_job.py:237) scheduler (airflow/cli/commands/scheduler_command.py:63) wrapper (airflow/utils/cli.py:89) command (airflow/cli/cli_parser.py:48) main (airflow/__main__.py:40) <module> (airflow:8) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
