This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-8-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 005292eefce8f03d8b0b0a1c637ba8ca96d19601 Author: Victor Chiapaikeo <[email protected]> AuthorDate: Fri Jan 12 02:56:54 2024 -0500 Add log lookup exception for empty op subtypes (#35536) * Add log lookup exception for empty op subtypes * Use exception catching approach instead to preserve tests (cherry picked from commit ddcaef45593a5411859327ab2d16ed648073b986) --- airflow/utils/log/file_task_handler.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 3e2561ba75..f86cc77736 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -29,6 +29,7 @@ from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, Iterable from urllib.parse import urljoin +import httpx import pendulum from airflow.configuration import conf @@ -78,8 +79,6 @@ def _set_task_deferred_context_var(): def _fetch_logs_from_service(url, log_relative_path): - import httpx - from airflow.utils.jwt_signer import JWTSigner timeout = conf.getint("webserver", "log_fetch_timeout_sec", fallback=None) @@ -170,6 +169,9 @@ class FileTaskHandler(logging.Handler): """ trigger_should_wrap = True + inherits_from_empty_operator_log_message = ( + "Operator inherits from empty operator and thus does not have logs" + ) def __init__(self, base_log_folder: str, filename_template: str | None = None): super().__init__() @@ -555,8 +557,11 @@ class FileTaskHandler(logging.Handler): messages.append(f"Found logs served from host {url}") logs.append(response.text) except Exception as e: - messages.append(f"Could not read served logs: {e}") - logger.exception("Could not read served logs") + if isinstance(e, httpx.UnsupportedProtocol) and ti.task.inherits_from_empty_operator is True: + messages.append(self.inherits_from_empty_operator_log_message) + else: + messages.append(f"Could not read served logs: {e}") + logger.exception("Could not read served logs") return messages, logs def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], list[str]]:
