This is an automated email from the ASF dual-hosted git repository. uranusjr pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 46704eed53 Throttle streaming log reads (#28818) 46704eed53 is described below commit 46704eed5340ef0e71cf828bb560b3d00aa88691 Author: Daniel Standish <15932138+dstand...@users.noreply.github.com> AuthorDate: Tue Jan 10 22:16:11 2023 -0800 Throttle streaming log reads (#28818) Co-authored-by: Jed Cunningham <66968678+jedcunning...@users.noreply.github.com> --- airflow/utils/log/log_reader.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py index 7ad4700195..5cc8b9377e 100644 --- a/airflow/utils/log/log_reader.py +++ b/airflow/utils/log/log_reader.py @@ -17,6 +17,7 @@ from __future__ import annotations import logging +import time from typing import Iterator from sqlalchemy.orm.session import Session @@ -33,6 +34,9 @@ from airflow.utils.state import State class TaskLogReader: """Task log reader""" + STREAM_LOOP_SLEEP_SECONDS = 0.5 + """Time to sleep between loops while waiting for more logs""" + def read_log_chunks( self, ti: TaskInstance, try_number: int | None, metadata ) -> tuple[list[tuple[tuple[str, str]]], dict[str, str]]: @@ -77,12 +81,19 @@ class TaskLogReader: metadata.pop("max_offset", None) metadata.pop("offset", None) metadata.pop("log_pos", None) - while "end_of_log" not in metadata or ( - not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED] - ): + while True: logs, metadata = self.read_log_chunks(ti, current_try_number, metadata) for host, log in logs[0]: yield "\n".join([host or "", log]) + "\n" + if "end_of_log" not in metadata or ( + not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED] + ): + if not logs[0]: + # we did not receive any logs in this loop + # sleeping to conserve resources / limit requests on external services + time.sleep(self.STREAM_LOOP_SLEEP_SECONDS) + else: + break @cached_property def log_handler(self):