This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v2-11-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-11-test by this push:
new 53a9c4a6cf4 Allow more empty loops before stopping log streaming
(#52614) (#52636)
53a9c4a6cf4 is described below
commit 53a9c4a6cf483af6699dddc69ae3db8ed649e261
Author: Rahul Vats <[email protected]>
AuthorDate: Tue Jul 1 16:23:38 2025 +0530
Allow more empty loops before stopping log streaming (#52614) (#52636)
In #50715 we starting short-circuiting if we hit 5 iterations of no new
log messages. This works well, except in the scenario where there are no
log messages at all. ES log handler has it's own short-circuit for that
scenario, but it triggers based on time and that works out to ~7
iterations. Let's let ES have the first crack at it so the user gets a
better message.
(cherry picked from commit 97bbf3ba031ebea162d649157cd8c5c1adcdb12e)
Co-authored-by: Jed Cunningham
<[email protected]>
---
airflow/utils/log/log_reader.py | 2 +-
tests/utils/log/test_log_reader.py | 8 ++------
2 files changed, 3 insertions(+), 7 deletions(-)
diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py
index d8639996178..c9573122a25 100644
--- a/airflow/utils/log/log_reader.py
+++ b/airflow/utils/log/log_reader.py
@@ -39,7 +39,7 @@ class TaskLogReader:
STREAM_LOOP_SLEEP_SECONDS = 1
"""Time to sleep between loops while waiting for more logs"""
- STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS = 5
+ STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS = 10
"""Number of empty loop iterations before stopping the stream"""
def read_log_chunks(
diff --git a/tests/utils/log/test_log_reader.py
b/tests/utils/log/test_log_reader.py
index 598ed8bcbba..2463db11a5b 100644
--- a/tests/utils/log/test_log_reader.py
+++ b/tests/utils/log/test_log_reader.py
@@ -229,11 +229,7 @@ class TestLogView:
def test_read_log_stream_no_end_of_log_marker(self, mock_read):
mock_read.side_effect = [
([[("", "hello")]], [{"end_of_log": False}]),
- ([[]], [{"end_of_log": False}]),
- ([[]], [{"end_of_log": False}]),
- ([[]], [{"end_of_log": False}]),
- ([[]], [{"end_of_log": False}]),
- ([[]], [{"end_of_log": False}]),
+ *[([[]], [{"end_of_log": False}]) for _ in range(10)],
]
self.ti.state = TaskInstanceState.SUCCESS
@@ -244,7 +240,7 @@ class TestLogView:
"\nhello\n",
"\n(Log stream stopped - End of log marker not found; logs may be
incomplete.)\n",
]
- assert mock_read.call_count == 6
+ assert mock_read.call_count == 11
def test_supports_external_link(self):
task_log_reader = TaskLogReader()