This is an automated email from the ASF dual-hosted git repository.
rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 97bbf3ba031 Allow more empty loops before stopping log streaming
(#52614)
97bbf3ba031 is described below
commit 97bbf3ba031ebea162d649157cd8c5c1adcdb12e
Author: Jed Cunningham <[email protected]>
AuthorDate: Tue Jul 1 02:24:11 2025 -0600
Allow more empty loops before stopping log streaming (#52614)
In #50715 we starting short-circuiting if we hit 5 iterations of no new
log messages. This works well, except in the scenario where there are no
log messages at all. ES log handler has it's own short-circuit for that
scenario, but it triggers based on time and that works out to ~7
iterations. Let's let ES have the first crack at it so the user gets a
better message.
Co-authored-by: Rahul Vats <[email protected]>
---
airflow-core/src/airflow/utils/log/log_reader.py | 3 ++-
airflow-core/tests/unit/utils/log/test_log_reader.py | 8 ++------
2 files changed, 4 insertions(+), 7 deletions(-)
diff --git a/airflow-core/src/airflow/utils/log/log_reader.py
b/airflow-core/src/airflow/utils/log/log_reader.py
index 6ce10e387e5..0bb61c52dbc 100644
--- a/airflow-core/src/airflow/utils/log/log_reader.py
+++ b/airflow-core/src/airflow/utils/log/log_reader.py
@@ -45,7 +45,8 @@ class TaskLogReader:
STREAM_LOOP_SLEEP_SECONDS = 1
"""Time to sleep between loops while waiting for more logs"""
- STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS = 5
+
+ STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS = 10
"""Number of empty loop iterations before stopping the stream"""
def read_log_chunks(
diff --git a/airflow-core/tests/unit/utils/log/test_log_reader.py
b/airflow-core/tests/unit/utils/log/test_log_reader.py
index 2e330773f82..5ca675f8d08 100644
--- a/airflow-core/tests/unit/utils/log/test_log_reader.py
+++ b/airflow-core/tests/unit/utils/log/test_log_reader.py
@@ -222,11 +222,7 @@ class TestLogView:
def test_read_log_stream_no_end_of_log_marker(self, mock_read):
mock_read.side_effect = [
(["hello"], {"end_of_log": False}),
- ([], {"end_of_log": False}),
- ([], {"end_of_log": False}),
- ([], {"end_of_log": False}),
- ([], {"end_of_log": False}),
- ([], {"end_of_log": False}),
+ *[([], {"end_of_log": False}) for _ in range(10)],
]
self.ti.state = TaskInstanceState.SUCCESS
@@ -237,7 +233,7 @@ class TestLogView:
"hello\n",
"(Log stream stopped - End of log marker not found; logs may be
incomplete.)\n",
]
- assert mock_read.call_count == 6
+ assert mock_read.call_count == 11
def test_supports_external_link(self):
task_log_reader = TaskLogReader()