This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 82725e176d6 Ensure that we don't try sending any more heartbeat
messages once the process (#44767)
82725e176d6 is described below
commit 82725e176d69127de3745be7bb691baf09c37d76
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Sat Dec 7 17:31:50 2024 +0000
Ensure that we don't try sending any more heartbeat messages once the
process (#44767)
has exited.
We noticed sometimes in CI that we would get 3 requests made, which
"shouldn't" happen, once it gets the 4xx error to the heartbeat it is meant
to
kill the task process.
The vause of this was mostly an artifect of the short heartbeat interval we
used in the tests, and how we poll for the subprocess exit code. I don't
think
it could have happened in practice (and it wouldn't affect anything if it
did)
but I've made it more-robust anyway.
---
.../src/airflow/sdk/execution_time/supervisor.py | 9 ++++++---
task_sdk/tests/execution_time/test_supervisor.py | 20 +++++++++++---------
2 files changed, 17 insertions(+), 12 deletions(-)
diff --git a/task_sdk/src/airflow/sdk/execution_time/supervisor.py
b/task_sdk/src/airflow/sdk/execution_time/supervisor.py
index 1b2a7ba7577..9b4933de808 100644
--- a/task_sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task_sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -518,11 +518,14 @@ class WatchedSubprocess:
)
# Block until events are ready or the timeout is reached
# This listens for activity (e.g., subprocess output) on
registered file objects
- self._service_subprocess(max_wait_time=max_wait_time)
+ alive = self._service_subprocess(max_wait_time=max_wait_time) is
None
- self._send_heartbeat_if_needed()
+ if alive:
+ # We don't need to heartbeat if the process has shutdown, as
we are just finishing of reading the
+ # logs
+ self._send_heartbeat_if_needed()
- self._handle_task_overtime_if_needed()
+ self._handle_task_overtime_if_needed()
def _handle_task_overtime_if_needed(self):
"""Handle termination of auxiliary processes if the task exceeds the
configured overtime."""
diff --git a/task_sdk/tests/execution_time/test_supervisor.py
b/task_sdk/tests/execution_time/test_supervisor.py
index cb43904f172..e51f418dad4 100644
--- a/task_sdk/tests/execution_time/test_supervisor.py
+++ b/task_sdk/tests/execution_time/test_supervisor.py
@@ -349,11 +349,14 @@ class TestWatchedSubprocess:
"""
import airflow.sdk.execution_time.supervisor
- monkeypatch.setattr(airflow.sdk.execution_time.supervisor,
"MIN_HEARTBEAT_INTERVAL", 0.1)
+ # Heartbeat every time around the loop
+ monkeypatch.setattr(airflow.sdk.execution_time.supervisor,
"MIN_HEARTBEAT_INTERVAL", 0.0)
def subprocess_main():
sys.stdin.readline()
sleep(5)
+ # Shouldn't get here
+ exit(5)
ti_id = uuid7()
@@ -389,23 +392,22 @@ class TestWatchedSubprocess:
# Wait for the subprocess to finish -- it should have been terminated
assert proc.wait() == -signal.SIGTERM
- count_request = request_count["count"]
+ assert request_count["count"] == 2
# Verify the number of requests made
- assert count_request >= 2
assert captured_logs == [
{
- "event": "Server indicated the task shouldn't be running
anymore",
- "level": "error",
- "status_code": 409,
- "logger": "supervisor",
- "timestamp": mocker.ANY,
"detail": {
"reason": "not_running",
"message": "TI is no longer in the running state and task
should terminate",
"current_state": "success",
},
+ "event": "Server indicated the task shouldn't be running
anymore",
+ "level": "error",
+ "status_code": 409,
+ "logger": "supervisor",
+ "timestamp": mocker.ANY,
}
- ] * (count_request - 1)
+ ]
@pytest.mark.parametrize("captured_logs", [logging.WARNING], indirect=True)
def test_heartbeat_failures_handling(self, monkeypatch, mocker,
captured_logs, time_machine):