This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 82725e176d6 Ensure that we don't try sending any more heartbeat 
messages once the process (#44767)
82725e176d6 is described below

commit 82725e176d69127de3745be7bb691baf09c37d76
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Sat Dec 7 17:31:50 2024 +0000

    Ensure that we don't try sending any more heartbeat messages once the 
process (#44767)
    
    has exited.
    
    We noticed sometimes in CI that we would get 3 requests made, which
    "shouldn't" happen, once it gets the 4xx error to the heartbeat it is meant 
to
    kill the task process.
    
    The vause of this was mostly an artifect of the short heartbeat interval we
    used in the tests, and how we poll for the subprocess exit code. I don't 
think
    it could have happened in practice (and it wouldn't affect anything if it 
did)
    but I've made it more-robust anyway.
---
 .../src/airflow/sdk/execution_time/supervisor.py     |  9 ++++++---
 task_sdk/tests/execution_time/test_supervisor.py     | 20 +++++++++++---------
 2 files changed, 17 insertions(+), 12 deletions(-)

diff --git a/task_sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task_sdk/src/airflow/sdk/execution_time/supervisor.py
index 1b2a7ba7577..9b4933de808 100644
--- a/task_sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task_sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -518,11 +518,14 @@ class WatchedSubprocess:
             )
             # Block until events are ready or the timeout is reached
             # This listens for activity (e.g., subprocess output) on 
registered file objects
-            self._service_subprocess(max_wait_time=max_wait_time)
+            alive = self._service_subprocess(max_wait_time=max_wait_time) is 
None
 
-            self._send_heartbeat_if_needed()
+            if alive:
+                # We don't need to heartbeat if the process has shutdown, as 
we are just finishing of reading the
+                # logs
+                self._send_heartbeat_if_needed()
 
-            self._handle_task_overtime_if_needed()
+                self._handle_task_overtime_if_needed()
 
     def _handle_task_overtime_if_needed(self):
         """Handle termination of auxiliary processes if the task exceeds the 
configured overtime."""
diff --git a/task_sdk/tests/execution_time/test_supervisor.py 
b/task_sdk/tests/execution_time/test_supervisor.py
index cb43904f172..e51f418dad4 100644
--- a/task_sdk/tests/execution_time/test_supervisor.py
+++ b/task_sdk/tests/execution_time/test_supervisor.py
@@ -349,11 +349,14 @@ class TestWatchedSubprocess:
         """
         import airflow.sdk.execution_time.supervisor
 
-        monkeypatch.setattr(airflow.sdk.execution_time.supervisor, 
"MIN_HEARTBEAT_INTERVAL", 0.1)
+        # Heartbeat every time around the loop
+        monkeypatch.setattr(airflow.sdk.execution_time.supervisor, 
"MIN_HEARTBEAT_INTERVAL", 0.0)
 
         def subprocess_main():
             sys.stdin.readline()
             sleep(5)
+            # Shouldn't get here
+            exit(5)
 
         ti_id = uuid7()
 
@@ -389,23 +392,22 @@ class TestWatchedSubprocess:
         # Wait for the subprocess to finish -- it should have been terminated
         assert proc.wait() == -signal.SIGTERM
 
-        count_request = request_count["count"]
+        assert request_count["count"] == 2
         # Verify the number of requests made
-        assert count_request >= 2
         assert captured_logs == [
             {
-                "event": "Server indicated the task shouldn't be running 
anymore",
-                "level": "error",
-                "status_code": 409,
-                "logger": "supervisor",
-                "timestamp": mocker.ANY,
                 "detail": {
                     "reason": "not_running",
                     "message": "TI is no longer in the running state and task 
should terminate",
                     "current_state": "success",
                 },
+                "event": "Server indicated the task shouldn't be running 
anymore",
+                "level": "error",
+                "status_code": 409,
+                "logger": "supervisor",
+                "timestamp": mocker.ANY,
             }
-        ] * (count_request - 1)
+        ]
 
     @pytest.mark.parametrize("captured_logs", [logging.WARNING], indirect=True)
     def test_heartbeat_failures_handling(self, monkeypatch, mocker, 
captured_logs, time_machine):

Reply via email to