kaxil commented on code in PR #44406:
URL: https://github.com/apache/airflow/pull/44406#discussion_r1861397492


##########
task_sdk/tests/execution_time/test_supervisor.py:
##########
@@ -258,6 +261,55 @@ def test_run_simple_dag(self, test_dags_dir, 
captured_logs, time_machine):
             "timestamp": "2024-11-07T12:34:56.078901Z",
         } in captured_logs
 
+    def test_state_conflict_on_heartbeat(self, captured_logs, monkeypatch, 
mocker):
+        """
+        Test that ensures that the Supervisor does not cause the task to fail 
if the Task Instance is no longer
+        in the running state.
+        """
+        
structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(logging.ERROR))
+
+        import airflow.sdk.execution_time.supervisor
+
+        monkeypatch.setattr(airflow.sdk.execution_time.supervisor, 
"MIN_HEARTBEAT_INTERVAL", 0.1)
+
+        def subprocess_main():
+            sys.stdin.readline()
+            sleep(5)
+
+        ti_id = uuid7()
+
+        def handle_request(request: httpx.Request) -> httpx.Response:
+            if request.url.path == f"/task-instances/{ti_id}/heartbeat":
+                return httpx.Response(
+                    409,
+                    json={
+                        "reason": "not_running",
+                        "message": "TI is no longer in the running state and 
task should terminate",
+                        "current_state": "success",
+                    },

Review Comment:
   Yeah, done in 
https://github.com/apache/airflow/pull/44406/commits/d57bd63398b9f667f8f179008b05f98a2c74e810



##########
task_sdk/tests/execution_time/test_supervisor.py:
##########
@@ -258,6 +261,55 @@ def test_run_simple_dag(self, test_dags_dir, 
captured_logs, time_machine):
             "timestamp": "2024-11-07T12:34:56.078901Z",
         } in captured_logs
 
+    def test_state_conflict_on_heartbeat(self, captured_logs, monkeypatch, 
mocker):
+        """
+        Test that ensures that the Supervisor does not cause the task to fail 
if the Task Instance is no longer
+        in the running state.
+        """
+        
structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(logging.ERROR))

Review Comment:
   moved it to `captured_logs` fixture fixed in 
https://github.com/apache/airflow/pull/44406/commits/d57bd63398b9f667f8f179008b05f98a2c74e810
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to