ashb commented on code in PR #44406:
URL: https://github.com/apache/airflow/pull/44406#discussion_r1862662090


##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -265,7 +270,13 @@ class WatchedSubprocess:
     _terminal_state: str | None = None
     _final_state: str | None = None
 
-    _last_heartbeat: float = 0
+    _last_successful_heartbeat: float = 0
+    _last_heartbeat_attempt: float = 0

Review Comment:
   ```suggestion
       _last_successful_heartbeat: float = attrs.Field(default=0, init=False)
       _last_heartbeat_attempt: float = attrs.Field(default=0, init=False)
   ```
   
   to make it not show up as args in init



##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -265,7 +270,13 @@ class WatchedSubprocess:
     _terminal_state: str | None = None
     _final_state: str | None = None
 
-    _last_heartbeat: float = 0
+    _last_successful_heartbeat: float = 0
+    _last_heartbeat_attempt: float = 0
+
+    # After the failure of a heartbeat, we'll increment this counter. If it 
reaches `MAX_FAILED_HEARTBEATS`, we
+    # will kill the process. This is to handle temporary network issues etc. 
ensuring that the process
+    # does not hang around forever.
+    failed_heartbeats: int = 0

Review Comment:
   ```suggestion
       failed_heartbeats: int = attrs.Field(default=0, init=False)
   ```



##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -423,38 +434,54 @@ def _monitor_subprocess(self):
 
         This function:
 
-        - Polls the subprocess for output
-        - Sends heartbeats to the client to keep the task alive
-        - Checks if the subprocess has exited
+        - Waits for activity on file objects (e.g., subprocess stdout, stderr) 
using the selector.
+        - Processes events triggered on the monitored file objects, such as 
data availability or EOF.
+        - Sends heartbeats to ensure the process is alive and checks if the 
subprocess has exited.
         """
-        # Until we have a selector for the process, don't poll for more than 
10s, just in case it exists but
-        # doesn't produce any output
-        max_poll_interval = 10
-
         while self._exit_code is None or len(self.selector.get_map()):
-            last_heartbeat_ago = time.monotonic() - self._last_heartbeat
+            last_heartbeat_ago = time.monotonic() - 
self._last_successful_heartbeat
             # Monitor the task to see if it's done. Wait in a syscall 
(`select`) for as long as possible
             # so we notice the subprocess finishing as quick as we can.
             max_wait_time = max(
                 0,  # Make sure this value is never negative,
                 min(
                     # Ensure we heartbeat _at most_ 75% through time the 
zombie threshold time
-                    SLOWEST_HEARTBEAT_INTERVAL - last_heartbeat_ago * 0.75,
-                    max_poll_interval,
+                    HEARTBEAT_THRESHOLD - last_heartbeat_ago * 0.75,
+                    MIN_HEARTBEAT_INTERVAL,
                 ),
             )
+            # Block until events are ready or the timeout is reached
+            # This listens for activity (e.g., subprocess output) on 
registered file objects
             events = self.selector.select(timeout=max_wait_time)
-            for key, _ in events:
-                socket_handler = key.data
-                need_more = socket_handler(key.fileobj)
-
-                if not need_more:
-                    self.selector.unregister(key.fileobj)
-                    key.fileobj.close()  # type: ignore[union-attr]
+            self._process_file_object_events(events)
 
             self._check_subprocess_exit()
             self._send_heartbeat_if_needed()
 
+    def _process_file_object_events(self, events: list[tuple[SelectorKey, 
int]]):
+        """
+        Process selector events by invoking handlers for each file object.
+
+        For each file object event, this method retrieves the associated 
handler and processes
+        the event. If the handler indicates that the file object no longer 
needs
+        monitoring (e.g., EOF or closed), the file object is unregistered and 
closed.
+        """
+        for key, _ in events:
+            # Retrieve the handler responsible for processing this file object 
(e.g., stdout, stderr)
+            socket_handler = key.data
+
+            # Example of handler behavior:
+            # If the subprocess writes "Hello, World!" to stdout:
+            # - `socket_handler` reads and processes the message.
+            # - If EOF is reached, the handler returns False to signal no more 
reads are expected.
+            need_more = socket_handler(key.fileobj)
+
+            # If the handler signals that the file object is no longer needed 
(EOF, closed, etc.)
+            # unregister it from the selector to stop monitoring and close it 
cleanly.

Review Comment:
   ```suggestion
               # unregister it from the selector to stop monitoring; `wait()` 
blocks until all selectors
               # are removed
   ```



##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -423,38 +434,54 @@ def _monitor_subprocess(self):
 
         This function:
 
-        - Polls the subprocess for output
-        - Sends heartbeats to the client to keep the task alive
-        - Checks if the subprocess has exited
+        - Waits for activity on file objects (e.g., subprocess stdout, stderr) 
using the selector.

Review Comment:
   Logs and requests socket too



##########
task_sdk/tests/execution_time/test_supervisor.py:
##########
@@ -299,6 +289,146 @@ def handle_request(request: httpx.Request) -> 
httpx.Response:
             "previous_state": "running",
         }
 
+    @pytest.mark.parametrize("captured_logs", [logging.ERROR], indirect=True, 
ids=["log_level=error"])
+    def test_state_conflict_on_heartbeat(self, captured_logs, monkeypatch, 
mocker):
+        """
+        Test that ensures that the Supervisor does not cause the task to fail 
if the Task Instance is no longer
+        in the running state. Instead, it logs the error and terminates the 
task process if it
+        might be running in a different state or has already completed -- or 
running on a different worker.
+        """
+        import airflow.sdk.execution_time.supervisor
+
+        monkeypatch.setattr(airflow.sdk.execution_time.supervisor, 
"MIN_HEARTBEAT_INTERVAL", 0.1)
+
+        def subprocess_main():
+            sys.stdin.readline()
+            sleep(5)
+
+        ti_id = uuid7()
+
+        # Track the number of requests to simulate mixed responses
+        request_count = {"count": 0}
+
+        def handle_request(request: httpx.Request) -> httpx.Response:
+            if request.url.path == f"/task-instances/{ti_id}/heartbeat":
+                request_count["count"] += 1
+                if request_count["count"] == 1:
+                    # First request succeeds
+                    return httpx.Response(status_code=204)
+                else:
+                    # Second request returns a conflict status code
+                    return httpx.Response(
+                        409,
+                        json={
+                            "reason": "not_running",
+                            "message": "TI is no longer in the running state 
and task should terminate",
+                            "current_state": "success",
+                        },
+                    )
+            # Return a 204 for all other requests like the initial call to 
mark the task as running
+            return httpx.Response(status_code=204)
+
+        proc = WatchedSubprocess.start(
+            path=os.devnull,
+            ti=TaskInstance(id=ti_id, task_id="b", dag_id="c", run_id="d", 
try_number=1),
+            client=make_client(transport=httpx.MockTransport(handle_request)),
+            target=subprocess_main,
+        )
+
+        # Wait for the subprocess to finish

Review Comment:
   ```suggestion
           # Wait for the subprocess to finish -- it should have been termianted
   ```



##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -466,14 +493,48 @@ def _check_subprocess_exit(self):
 
     def _send_heartbeat_if_needed(self):
         """Send a heartbeat to the client if heartbeat interval has passed."""
-        if time.monotonic() - self._last_heartbeat >= 
FASTEST_HEARTBEAT_INTERVAL:
-            try:
-                self.client.task_instances.heartbeat(self.ti_id, 
pid=self._process.pid)
-                self._last_heartbeat = time.monotonic()
-            except Exception:
-                log.warning("Failed to send heartbeat", exc_info=True)
-                # TODO: If we couldn't heartbeat for X times the interval, 
kill ourselves
-                pass
+        # Respect the minimum interval between heartbeat attempts
+        if (time.monotonic() - self._last_heartbeat_attempt) < 
MIN_HEARTBEAT_INTERVAL:
+            return
+
+        self._last_heartbeat_attempt = time.monotonic()
+        try:
+            self.client.task_instances.heartbeat(self.ti_id, 
pid=self._process.pid)
+            # Update the last heartbeat time on success
+            self._last_successful_heartbeat = time.monotonic()
+
+            # Reset the counter on success
+            self.failed_heartbeats = 0
+        except ServerResponseError as e:
+            if e.response.status_code in {409, 404}:

Review Comment:
   Should we use the [http.HTTPStatus 
enum](https://docs.python.org/3/library/http.html#http.HTTPStatus) here?
   
   ```suggestion
               if e.response.status_code in {HTTPStatus.CONFILCT, 
HTTPStatus.NOT_FOUND}:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to