yuqian90 opened a new issue #15938:
URL: https://github.com/apache/airflow/issues/15938


   **Apache Airflow version**: 1.10.13 onwards (Any version that picked up 
#11278, including Airflow 2.0.* and 2.1.*)
   
   
   **Environment**:
   - **Cloud provider or hardware configuration**: Any
   - **OS** (e.g. from /etc/os-release): Only tested on Debian Linux, but 
others may be affected too
   - **Kernel** (e.g. `uname -a`): Any
   - **Install tools**: Any
   - **Others**: Only celery_executor is affected
   
   **What happened**:
   This was first reported 
[here](https://github.com/apache/airflow/issues/7935#issuecomment-839656436).
   airflow-scheduler sometimes stops heartbeating and stops scheduling any 
tasks with this last line in the log. This happen at random times, a few times 
a week. Happens more often if the scheduler machine is slow. 
   
   ```
   {scheduler_job.py:746} INFO - Exiting gracefully upon receiving signal 15
   ```
   
   Related to #7935
   Most likely caused by #11278
   
   **What you expected to happen**: 
   Scheduler should not become stuck
   
   **How to reproduce it**:
   
   Here's a small reproducing example of the problem. There's roughly 1/25 
chance it will be stuck. Run it many times to see it happen.
   
   ```python
   #!/usr/bin/env python3.8
   import os
   import random
   import signal
   import time
   from multiprocessing import Pool
   
   
   def send_task_to_executor(arg):
       pass
   
   
   def _exit_gracefully(signum, frame):
       print(f"{os.getpid()} Exiting gracefully upon receiving signal {signum}")
   
   
   def register_signals():
       print(f"{os.getpid()} register_signals()")
       signal.signal(signal.SIGINT, _exit_gracefully)
       signal.signal(signal.SIGTERM, _exit_gracefully)
       signal.signal(signal.SIGUSR2, _exit_gracefully)
   
   
   def reset_signals():
       if random.randint(0, 500) == 0:
           # This sleep statement here simulates the machine being busy
           print(f"{os.getpid()} is slow")
           time.sleep(0.1)
       signal.signal(signal.SIGINT, signal.SIG_DFL)
       signal.signal(signal.SIGTERM, signal.SIG_DFL)
       signal.signal(signal.SIGUSR2, signal.SIG_DFL)
   
   
   if __name__ == "__main__":
       register_signals()
   
       task_tuples_to_send = list(range(20))
       sync_parallelism = 15
       chunksize = 5
   
       with Pool(processes=sync_parallelism, initializer=reset_signals) as pool:
           pool.map(
               send_task_to_executor,
               task_tuples_to_send,
               chunksize=chunksize,
           )
   
   
   ```
   
   The reproducing example above can become stuck with a `py-spy dump` that 
looks exactly like what airflow scheduler does:
   
   `py-spy dump` for the parent `airflow scheduler` process
   ```
   Python v3.8.7
   
   Thread 0x7FB54794E740 (active): "MainThread"
       poll (multiprocessing/popen_fork.py:27)
       wait (multiprocessing/popen_fork.py:47)
       join (multiprocessing/process.py:149)
       _terminate_pool (multiprocessing/pool.py:729)
       __call__ (multiprocessing/util.py:224)
       terminate (multiprocessing/pool.py:654)
       __exit__ (multiprocessing/pool.py:736)
       _send_tasks_to_celery (airflow/executors/celery_executor.py:331)
       _process_tasks (airflow/executors/celery_executor.py:272)
       trigger_tasks (airflow/executors/celery_executor.py:263)
       heartbeat (airflow/executors/base_executor.py:158)
       _run_scheduler_loop (airflow/jobs/scheduler_job.py:1388)
       _execute (airflow/jobs/scheduler_job.py:1284)
       run (airflow/jobs/base_job.py:237)
       scheduler (airflow/cli/commands/scheduler_command.py:63)
       wrapper (airflow/utils/cli.py:89)
       command (airflow/cli/cli_parser.py:48)
       main (airflow/__main__.py:40)
       <module> (airflow:8)
   ```
   
   `py-spy dump` for the child `airflow scheduler` process
   
   ```
   Python v3.8.7
   
   Thread 16232 (idle): "MainThread"
       __enter__ (multiprocessing/synchronize.py:95)
       get (multiprocessing/queues.py:355)
       worker (multiprocessing/pool.py:114)
       run (multiprocessing/process.py:108)
       _bootstrap (multiprocessing/process.py:315)
       _launch (multiprocessing/popen_fork.py:75)
       __init__ (multiprocessing/popen_fork.py:19)
       _Popen (multiprocessing/context.py:277)
       start (multiprocessing/process.py:121)
       _repopulate_pool_static (multiprocessing/pool.py:326)
       _repopulate_pool (multiprocessing/pool.py:303)
       __init__ (multiprocessing/pool.py:212)
       Pool (multiprocessing/context.py:119)
       _send_tasks_to_celery (airflow/executors/celery_executor.py:330)
       _process_tasks (airflow/executors/celery_executor.py:272)
       trigger_tasks (airflow/executors/celery_executor.py:263)
       heartbeat (airflow/executors/base_executor.py:158)
       _run_scheduler_loop (airflow/jobs/scheduler_job.py:1388)
       _execute (airflow/jobs/scheduler_job.py:1284)
       run (airflow/jobs/base_job.py:237)
       scheduler (airflow/cli/commands/scheduler_command.py:63)
       wrapper (airflow/utils/cli.py:89)
       command (airflow/cli/cli_parser.py:48)
       main (airflow/__main__.py:40)
       <module> (airflow:8)
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to