This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f9e8969ff7 Add SIGUSR2 handler for LocalTaskJob and workers to aid 
debugging (#28309)
f9e8969ff7 is described below

commit f9e8969ff77d9a6498a779a912f78ca309c95aaa
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Mon Dec 12 22:34:03 2022 +0000

    Add SIGUSR2 handler for LocalTaskJob and workers to aid debugging (#28309)
    
    There have been multiple reports of people with tasks stuck in the
    running state, and no obvious activity from the running task, but the
    supervisor is still actively heart beating.
    
    In order to make it easier/possibly to tell _where_ the process is stuck
    we add a SIGUSR2 handler to the Task supervisor (that is purposefully
    inherited to the actual task process itself) that will print the current
    stack trace on receiving USR2 - is the same signal we use for
    causing a debug dump in the Scheduler.
---
 airflow/jobs/local_task_job.py | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py
index 3789c4259a..dcebe021e7 100644
--- a/airflow/jobs/local_task_job.py
+++ b/airflow/jobs/local_task_job.py
@@ -120,8 +120,19 @@ class LocalTaskJob(BaseJob):
             self.handle_task_exit(128 + signum)
             raise AirflowException("Segmentation Fault detected.")
 
+        def sigusr2_debug_handler(signum, frame):
+            import sys
+            import threading
+            import traceback
+
+            id2name = {th.ident: th.name for th in threading.enumerate()}
+            for threadId, stack in sys._current_frames().items():
+                print(id2name[threadId])
+                traceback.print_stack(f=stack)
+
         signal.signal(signal.SIGSEGV, segfault_signal_handler)
         signal.signal(signal.SIGTERM, signal_handler)
+        signal.signal(signal.SIGUSR2, sigusr2_debug_handler)
 
         if not self.task_instance.check_and_change_state_before_execution(
             mark_success=self.mark_success,

Reply via email to