kaxil commented on code in PR #44626:
URL: https://github.com/apache/airflow/pull/44626#discussion_r1876098693
##########
task_sdk/tests/execution_time/test_supervisor.py:
##########
@@ -179,6 +179,31 @@ def subprocess_main():
assert rc == -9
+ def test_supervisor_signal_handling(self, mocker):
+ """Verify that the supervisor correctly handles signals and terminates
the task process."""
+ mock_logger = mocker.patch("airflow.sdk.execution_time.supervisor.log")
+ mock_kill =
mocker.patch("airflow.sdk.execution_time.supervisor.WatchedSubprocess.kill")
+
+ proc = WatchedSubprocess(
+ ti_id=TI_ID, pid=12345, stdin=mocker.Mock(),
process=mocker.Mock(), client=mocker.Mock()
+ )
+
+ # Send a SIGTERM signal to the supervisor
+ proc._setup_signal_handlers()
+ os.kill(os.getpid(), signal.SIGTERM)
+
+ # Verify task process termination and log messages
+ # Asserting that `proc.kill` is called with the correct signal is
sufficient to verify the supervisor
+ # correctly handles the signal and terminates the task process
+ # The actual signal sent to the task process is tested in
`TestWatchedSubprocessKill` class
+ mock_kill.assert_called_once_with(signal.SIGTERM, force=True)
+ mock_logger.error.assert_called_once_with(
+ "Received termination signal in supervisor. Terminating watched
subprocess",
Review Comment:
Alternative is:
```python
def test_supervisor_signal_handling(self, mocker, monkeypatch,
captured_logs):
"""Verify that the supervisor correctly handles signals and
terminates the task process."""
mocker.patch("airflow.sdk.execution_time.supervisor.MIN_HEARTBEAT_INTERVAL",
0.01)
mock_client = MagicMock(spec=sdk_client.Client)
subprocess_pid = os.getpid()
def subprocess_main():
os.kill(subprocess_pid, signal.SIGTERM)
# It should not take 5 seconds. The process should terminate
immediately
sleep(5)
proc = WatchedSubprocess.start(
path=os.devnull,
ti=TaskInstance(id=TI_ID, task_id="b", dag_id="c", run_id="d",
try_number=1),
client=mock_client,
target=subprocess_main,
)
rc = proc.wait()
assert rc == -signal.SIGTERM
assert proc.final_state == TerminalTIState.FAILED
assert proc._exit_code == -signal.SIGTERM
assert {
'signal': signal.SIGTERM,
'process_pid': proc.pid,
'supervisor_pid': mocker.ANY,
'event': 'Received termination signal in supervisor. Terminating
watched subprocess',
'timestamp': mocker.ANY,
'level': 'error',
'logger': 'supervisor'
} in captured_logs
assert {
'pid': proc.pid,
'exit_code': -signal.SIGTERM,
'signal': "SIGTERM",
'event': 'Process exited',
'timestamp': mocker.ANY,
'level': 'info',
'logger': 'supervisor'
} in captured_logs
mock_client.task_instances.finish.assert_called_once_with(
id=TI_ID, state=TerminalTIState.FAILED, when=mocker.ANY
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]