kaxil commented on code in PR #44626:
URL: https://github.com/apache/airflow/pull/44626#discussion_r1876098693


##########
task_sdk/tests/execution_time/test_supervisor.py:
##########
@@ -179,6 +179,31 @@ def subprocess_main():
 
         assert rc == -9
 
+    def test_supervisor_signal_handling(self, mocker):
+        """Verify that the supervisor correctly handles signals and terminates 
the task process."""
+        mock_logger = mocker.patch("airflow.sdk.execution_time.supervisor.log")
+        mock_kill = 
mocker.patch("airflow.sdk.execution_time.supervisor.WatchedSubprocess.kill")
+
+        proc = WatchedSubprocess(
+            ti_id=TI_ID, pid=12345, stdin=mocker.Mock(), 
process=mocker.Mock(), client=mocker.Mock()
+        )
+
+        # Send a SIGTERM signal to the supervisor
+        proc._setup_signal_handlers()
+        os.kill(os.getpid(), signal.SIGTERM)
+
+        # Verify task process termination and log messages
+        # Asserting that `proc.kill` is called with the correct signal is 
sufficient to verify the supervisor
+        # correctly handles the signal and terminates the task process
+        # The actual signal sent to the task process is tested in 
`TestWatchedSubprocessKill` class
+        mock_kill.assert_called_once_with(signal.SIGTERM, force=True)
+        mock_logger.error.assert_called_once_with(
+            "Received termination signal in supervisor. Terminating watched 
subprocess",

Review Comment:
   Alternative is:
   
   ```python
       def test_supervisor_signal_handling(self, mocker, monkeypatch, 
captured_logs):
           """Verify that the supervisor correctly handles signals and 
terminates the task process."""
           
mocker.patch("airflow.sdk.execution_time.supervisor.MIN_HEARTBEAT_INTERVAL", 
0.01)
           mock_client = MagicMock(spec=sdk_client.Client)
           subprocess_pid = os.getpid()
   
           def subprocess_main():
               os.kill(subprocess_pid, signal.SIGTERM)
               # It should not take 5 seconds. The process should terminate 
immediately
               sleep(5)
   
           proc = WatchedSubprocess.start(
               path=os.devnull,
               ti=TaskInstance(id=TI_ID, task_id="b", dag_id="c", run_id="d", 
try_number=1),
               client=mock_client,
               target=subprocess_main,
           )
   
           rc = proc.wait()
   
           assert rc == -signal.SIGTERM
           assert proc.final_state == TerminalTIState.FAILED
           assert proc._exit_code == -signal.SIGTERM
   
           assert {
              'signal': signal.SIGTERM,
              'process_pid': proc.pid,
              'supervisor_pid': mocker.ANY,
              'event': 'Received termination signal in supervisor. Terminating 
watched subprocess',
              'timestamp': mocker.ANY,
              'level': 'error',
              'logger': 'supervisor'
           } in captured_logs
   
           assert {
              'pid': proc.pid,
              'exit_code': -signal.SIGTERM,
              'signal': "SIGTERM",
              'event': 'Process exited',
              'timestamp': mocker.ANY,
              'level': 'info',
              'logger': 'supervisor'
           } in captured_logs
   
           mock_client.task_instances.finish.assert_called_once_with(
               id=TI_ID, state=TerminalTIState.FAILED, when=mocker.ANY
           )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to