kaxil commented on code in PR #44406:
URL: https://github.com/apache/airflow/pull/44406#discussion_r1861378015
##########
task_sdk/tests/execution_time/test_supervisor.py:
##########
@@ -258,6 +261,55 @@ def test_run_simple_dag(self, test_dags_dir,
captured_logs, time_machine):
"timestamp": "2024-11-07T12:34:56.078901Z",
} in captured_logs
+ def test_state_conflict_on_heartbeat(self, captured_logs, monkeypatch,
mocker):
+ """
+ Test that ensures that the Supervisor does not cause the task to fail
if the Task Instance is no longer
+ in the running state.
+ """
+
structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(logging.ERROR))
Review Comment:
moved it to `captured_logs` fixture fixed in
https://github.com/apache/airflow/pull/44406/commits/6ed56ce1f987990b1adbc1172c2de357e45a2413
##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -451,10 +449,38 @@ def _check_subprocess_exit(self):
def _send_heartbeat_if_needed(self):
"""Send a heartbeat to the client if heartbeat interval has passed."""
- if time.monotonic() - self._last_heartbeat >=
FASTEST_HEARTBEAT_INTERVAL:
+ # If the process has exited, we don't need to send any more heartbeats
+ if self._exit_code is not None:
+ return
+
+ if time.monotonic() - self._last_heartbeat >= MIN_HEARTBEAT_INTERVAL:
try:
self.client.task_instances.heartbeat(self.ti_id,
pid=self._process.pid)
self._last_heartbeat = time.monotonic()
+ except ServerResponseError as e:
+ # TODO: Should we instead check httpx.HTTPStatusError?
+ if e.response.status_code == 409:
+ reason = e.detail.get("reason", "")
+ if reason == "not_running":
+ error_msg = e.detail.get(
+ "message", "Task is no longer in the running state
and task should terminate"
+ )
+ log.error(error_msg,
current_state=e.detail.get("current_state"))
+ elif reason == "running_elsewhere":
+ error_msg = e.detail.get("message", "Task is already
running elsewhere")
+ log.error(
+ error_msg,
+ current_hostname=e.detail.get("current_hostname"),
+ current_pid=e.detail.get("current_pid"),
+ )
Review Comment:
good call - resolved in
https://github.com/apache/airflow/pull/44406/commits/6ed56ce1f987990b1adbc1172c2de357e45a2413
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]