This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ddcaef4559 Add log lookup exception for empty op subtypes (#35536)
ddcaef4559 is described below
commit ddcaef45593a5411859327ab2d16ed648073b986
Author: Victor Chiapaikeo <[email protected]>
AuthorDate: Fri Jan 12 02:56:54 2024 -0500
Add log lookup exception for empty op subtypes (#35536)
* Add log lookup exception for empty op subtypes
* Use exception catching approach instead to preserve tests
---
airflow/utils/log/file_task_handler.py | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
diff --git a/airflow/utils/log/file_task_handler.py
b/airflow/utils/log/file_task_handler.py
index 3e2561ba75..f86cc77736 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -29,6 +29,7 @@ from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Iterable
from urllib.parse import urljoin
+import httpx
import pendulum
from airflow.configuration import conf
@@ -78,8 +79,6 @@ def _set_task_deferred_context_var():
def _fetch_logs_from_service(url, log_relative_path):
- import httpx
-
from airflow.utils.jwt_signer import JWTSigner
timeout = conf.getint("webserver", "log_fetch_timeout_sec", fallback=None)
@@ -170,6 +169,9 @@ class FileTaskHandler(logging.Handler):
"""
trigger_should_wrap = True
+ inherits_from_empty_operator_log_message = (
+ "Operator inherits from empty operator and thus does not have logs"
+ )
def __init__(self, base_log_folder: str, filename_template: str | None =
None):
super().__init__()
@@ -555,8 +557,11 @@ class FileTaskHandler(logging.Handler):
messages.append(f"Found logs served from host {url}")
logs.append(response.text)
except Exception as e:
- messages.append(f"Could not read served logs: {e}")
- logger.exception("Could not read served logs")
+ if isinstance(e, httpx.UnsupportedProtocol) and
ti.task.inherits_from_empty_operator is True:
+ messages.append(self.inherits_from_empty_operator_log_message)
+ else:
+ messages.append(f"Could not read served logs: {e}")
+ logger.exception("Could not read served logs")
return messages, logs
def _read_remote_logs(self, ti, try_number, metadata=None) ->
tuple[list[str], list[str]]: