kaxil commented on code in PR #44406:
URL: https://github.com/apache/airflow/pull/44406#discussion_r1861378015


##########
task_sdk/tests/execution_time/test_supervisor.py:
##########
@@ -258,6 +261,55 @@ def test_run_simple_dag(self, test_dags_dir, 
captured_logs, time_machine):
             "timestamp": "2024-11-07T12:34:56.078901Z",
         } in captured_logs
 
+    def test_state_conflict_on_heartbeat(self, captured_logs, monkeypatch, 
mocker):
+        """
+        Test that ensures that the Supervisor does not cause the task to fail 
if the Task Instance is no longer
+        in the running state.
+        """
+        
structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(logging.ERROR))

Review Comment:
   moved it to `captured_logs` fixture fixed in 
https://github.com/apache/airflow/pull/44406/commits/6ed56ce1f987990b1adbc1172c2de357e45a2413
 



##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -451,10 +449,38 @@ def _check_subprocess_exit(self):
 
     def _send_heartbeat_if_needed(self):
         """Send a heartbeat to the client if heartbeat interval has passed."""
-        if time.monotonic() - self._last_heartbeat >= 
FASTEST_HEARTBEAT_INTERVAL:
+        # If the process has exited, we don't need to send any more heartbeats
+        if self._exit_code is not None:
+            return
+
+        if time.monotonic() - self._last_heartbeat >= MIN_HEARTBEAT_INTERVAL:
             try:
                 self.client.task_instances.heartbeat(self.ti_id, 
pid=self._process.pid)
                 self._last_heartbeat = time.monotonic()
+            except ServerResponseError as e:
+                # TODO: Should we instead check httpx.HTTPStatusError?
+                if e.response.status_code == 409:
+                    reason = e.detail.get("reason", "")
+                    if reason == "not_running":
+                        error_msg = e.detail.get(
+                            "message", "Task is no longer in the running state 
and task should terminate"
+                        )
+                        log.error(error_msg, 
current_state=e.detail.get("current_state"))
+                    elif reason == "running_elsewhere":
+                        error_msg = e.detail.get("message", "Task is already 
running elsewhere")
+                        log.error(
+                            error_msg,
+                            current_hostname=e.detail.get("current_hostname"),
+                            current_pid=e.detail.get("current_pid"),
+                        )

Review Comment:
   good call - resolved in 
https://github.com/apache/airflow/pull/44406/commits/6ed56ce1f987990b1adbc1172c2de357e45a2413



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to