This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-11-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-11-test by this push:
     new 2b8818a7552 [v2-11-test] Stop streaming task logs if end of log mark 
is missing (#51904)
2b8818a7552 is described below

commit 2b8818a75528f7ed480ceb5a2bf31b18bf38ca5d
Author: Jed Cunningham <[email protected]>
AuthorDate: Fri Jun 20 10:11:37 2025 -0600

    [v2-11-test] Stop streaming task logs if end of log mark is missing (#51904)
    
    Sometimes, somehow, the end of log mark can be missing, and when that
    happens the streaming log reader enters an infinite loop. Instead, if
    the task is in a non-running state and we stop receiving log lines but
    never get the end of log mark, we assume we won't and stop trying. We do
    tell emit that we are stopping though.
---
 airflow/utils/log/log_reader.py    | 13 ++++++++++++-
 tests/utils/log/test_log_reader.py | 21 +++++++++++++++++++++
 2 files changed, 33 insertions(+), 1 deletion(-)

diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py
index ad61a139086..d8639996178 100644
--- a/airflow/utils/log/log_reader.py
+++ b/airflow/utils/log/log_reader.py
@@ -39,6 +39,9 @@ class TaskLogReader:
     STREAM_LOOP_SLEEP_SECONDS = 1
     """Time to sleep between loops while waiting for more logs"""
 
+    STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS = 5
+    """Number of empty loop iterations before stopping the stream"""
+
     def read_log_chunks(
         self, ti: TaskInstance, try_number: int | None, metadata
     ) -> tuple[list[tuple[tuple[str, str]]], dict[str, str]]:
@@ -83,6 +86,7 @@ class TaskLogReader:
             metadata.pop("max_offset", None)
             metadata.pop("offset", None)
             metadata.pop("log_pos", None)
+            empty_iterations = 0
             while True:
                 logs, metadata = self.read_log_chunks(ti, current_try_number, 
metadata)
                 for host, log in logs[0]:
@@ -91,10 +95,17 @@ class TaskLogReader:
                     not metadata["end_of_log"]
                     and ti.state not in (TaskInstanceState.RUNNING, 
TaskInstanceState.DEFERRED)
                 ):
-                    if not logs[0]:
+                    if logs[0]:
+                        empty_iterations = 0
+                    else:
                         # we did not receive any logs in this loop
                         # sleeping to conserve resources / limit requests on 
external services
                         time.sleep(self.STREAM_LOOP_SLEEP_SECONDS)
+                        empty_iterations += 1
+                        if empty_iterations >= 
self.STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS:
+                            # we have not received any logs for a while, so we 
stop the stream
+                            yield "\n(Log stream stopped - End of log marker 
not found; logs may be incomplete.)\n"
+                            break
                 else:
                     break
 
diff --git a/tests/utils/log/test_log_reader.py 
b/tests/utils/log/test_log_reader.py
index d4417bfba82..598ed8bcbba 100644
--- a/tests/utils/log/test_log_reader.py
+++ b/tests/utils/log/test_log_reader.py
@@ -225,6 +225,27 @@ class TestLogView:
             any_order=False,
         )
 
+    @mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.read")
+    def test_read_log_stream_no_end_of_log_marker(self, mock_read):
+        mock_read.side_effect = [
+            ([[("", "hello")]], [{"end_of_log": False}]),
+            ([[]], [{"end_of_log": False}]),
+            ([[]], [{"end_of_log": False}]),
+            ([[]], [{"end_of_log": False}]),
+            ([[]], [{"end_of_log": False}]),
+            ([[]], [{"end_of_log": False}]),
+        ]
+
+        self.ti.state = TaskInstanceState.SUCCESS
+        task_log_reader = TaskLogReader()
+        task_log_reader.STREAM_LOOP_SLEEP_SECONDS = 0.001  # to speed up the 
test
+        log_stream = task_log_reader.read_log_stream(ti=self.ti, try_number=1, 
metadata={})
+        assert list(log_stream) == [
+            "\nhello\n",
+            "\n(Log stream stopped - End of log marker not found; logs may be 
incomplete.)\n",
+        ]
+        assert mock_read.call_count == 6
+
     def test_supports_external_link(self):
         task_log_reader = TaskLogReader()
 

Reply via email to