This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4f76227dbd6 Forward termination signals from supervisor to task 
subprocess (#61627)
4f76227dbd6 is described below

commit 4f76227dbd62f6d0d28cbc7580d090b5236fd30d
Author: AndrĂ© Ahlert <[email protected]>
AuthorDate: Sat May 30 19:02:38 2026 -0300

    Forward termination signals from supervisor to task subprocess (#61627)
    
    When a Kubernetes worker pod receives SIGTERM (spot interruption, scaling
    down, rolling update), the signal is delivered to the supervisor process
    (PID 1 in the container). The supervisor has no signal handler installed
    and exits with default behavior, leaving the task subprocess orphaned
    without ever calling the operator's `on_kill()` hook. Spawned resources
    (pods, subprocesses, etc.) are never cleaned up.
    
    The task subprocess already has a SIGTERM handler registered in
    `task_runner.py` that calls `on_kill()`, but the signal never reaches it
    because the supervisor process terminates first.
    
    Install SIGTERM/SIGINT signal handlers in `ActivitySubprocess.wait()`
    that forward the received signal to the task subprocess via `os.kill()`.
    The child's existing handler then calls `on_kill()` as expected,
    restoring the Airflow 2 behavior. Handlers are saved before
    `_monitor_subprocess()` and restored in a `finally` block.
    
    Placing the handler in `ActivitySubprocess.wait()` (rather than
    `supervise_task`) makes the forwarding work for any coordinator that
    uses `ActivitySubprocess` and keeps the handler colocated with the
    process it forwards to.
    
    Fixes: #58936
    
    Signed-off-by: AndrĂ© Ahlert <[email protected]>
---
 .../src/airflow/sdk/execution_time/supervisor.py   | 21 +++++++++
 .../tests/task_sdk/dags/signal_forward_test.py     | 44 +++++++++++++++++++
 .../task_sdk/execution_time/test_supervisor.py     | 51 +++++++++++++++++++++-
 3 files changed, 115 insertions(+), 1 deletion(-)

diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 781e98ae2f6..12d64dcf3f7 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -1391,10 +1391,31 @@ class ActivitySubprocess(WatchedSubprocess):
         if self._exit_code is not None:
             return self._exit_code
 
+        # Forward termination signals to the task subprocess so the operator's
+        # on_kill() hook runs on graceful shutdown (e.g. K8s pod SIGTERM).
+        # Without this the supervisor exits on SIGTERM without notifying the
+        # child, leaving spawned resources (pods, subprocesses, etc.) running.
+        prev_sigterm = signal.getsignal(signal.SIGTERM)
+        prev_sigint = signal.getsignal(signal.SIGINT)
+
+        def _forward_signal(signum, frame):
+            log.info(
+                "Received signal, forwarding to task subprocess",
+                signal=signal.Signals(signum).name,
+                pid=self.pid,
+            )
+            with suppress(ProcessLookupError):
+                os.kill(self.pid, signum)
+
+        signal.signal(signal.SIGTERM, _forward_signal)
+        signal.signal(signal.SIGINT, _forward_signal)
+
         try:
             self._monitor_subprocess()
         finally:
             self.selector.close()
+            signal.signal(signal.SIGTERM, prev_sigterm)
+            signal.signal(signal.SIGINT, prev_sigint)
 
         # self._monitor_subprocess() will set the exit code when the process 
has finished
         # If it hasn't, assume it's failed
diff --git a/task-sdk/tests/task_sdk/dags/signal_forward_test.py 
b/task-sdk/tests/task_sdk/dags/signal_forward_test.py
new file mode 100644
index 00000000000..ff85eda41a6
--- /dev/null
+++ b/task-sdk/tests/task_sdk/dags/signal_forward_test.py
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+import signal
+import time
+
+from airflow.sdk.bases.operator import BaseOperator
+from airflow.sdk.definitions.dag import dag
+
+
+class SignalForwardOperator(BaseOperator):
+    """Send SIGTERM to the supervisor parent process to exercise signal 
forwarding."""
+
+    def execute(self, context):
+        print("EXECUTE_STARTED", flush=True)
+        os.kill(os.getppid(), signal.SIGTERM)
+        time.sleep(2)
+
+    def on_kill(self) -> None:
+        print("ON_KILL_CALLED_VIA_SIGNAL_FORWARDING", flush=True)
+
+
+@dag()
+def signal_forward_test():
+    SignalForwardOperator(task_id="signal_task")
+
+
+signal_forward_test()
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py 
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 798500d381c..a9ce41bd29a 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -27,8 +27,9 @@ import signal
 import socket
 import subprocess
 import sys
+import threading
 import time
-from contextlib import nullcontext
+from contextlib import nullcontext, suppress
 from dataclasses import dataclass, field
 from datetime import datetime, timezone as dt_timezone
 from operator import attrgetter
@@ -262,6 +263,54 @@ class TestSupervisor:
             with expectation:
                 supervise_task(**kw)
 
+    def test_on_kill_hook_called_when_supervisor_receives_sigterm(
+        self,
+        test_dags_dir,
+        captured_logs,
+        client_with_ti_start,
+    ):
+        """SIGTERM to the supervisor process is forwarded to the task 
subprocess."""
+        ti = TaskInstanceDTO(
+            id=uuid7(),
+            task_id="signal_task",
+            dag_id="signal_forward_test",
+            run_id="r",
+            try_number=1,
+            dag_version_id=uuid7(),
+            pool_slots=1,
+            queue="default",
+            priority_weight=1,
+        )
+        bundle_info = BundleInfo(name="my-bundle", version=None)
+
+        supervisor_pid = os.getpid()
+
+        def _kill_children():
+            for child in 
psutil.Process(supervisor_pid).children(recursive=True):
+                with suppress(psutil.NoSuchProcess):
+                    child.kill()
+
+        watchdog = threading.Timer(20.0, _kill_children)
+        watchdog.daemon = True
+        watchdog.start()
+
+        try:
+            with patch.dict(os.environ, local_dag_bundle_cfg(test_dags_dir, 
bundle_info.name)):
+                supervise_task(
+                    ti=ti,
+                    dag_rel_path="signal_forward_test.py",
+                    token="",
+                    dry_run=True,
+                    client=client_with_ti_start,
+                    bundle_info=bundle_info,
+                )
+        finally:
+            watchdog.cancel()
+
+        stdout_events = [entry["event"] for entry in captured_logs if 
entry.get("logger") == "task.stdout"]
+        assert "EXECUTE_STARTED" in stdout_events
+        assert "ON_KILL_CALLED_VIA_SIGNAL_FORWARDING" in stdout_events
+
 
 @pytest.mark.usefixtures("disable_capturing")
 class TestWatchedSubprocess:

Reply via email to