This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 005292eefce8f03d8b0b0a1c637ba8ca96d19601
Author: Victor Chiapaikeo <[email protected]>
AuthorDate: Fri Jan 12 02:56:54 2024 -0500

    Add log lookup exception for empty op subtypes (#35536)
    
    * Add log lookup exception for empty op subtypes
    
    * Use exception catching approach instead to preserve tests
    
    (cherry picked from commit ddcaef45593a5411859327ab2d16ed648073b986)
---
 airflow/utils/log/file_task_handler.py | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)

diff --git a/airflow/utils/log/file_task_handler.py 
b/airflow/utils/log/file_task_handler.py
index 3e2561ba75..f86cc77736 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -29,6 +29,7 @@ from pathlib import Path
 from typing import TYPE_CHECKING, Any, Callable, Iterable
 from urllib.parse import urljoin
 
+import httpx
 import pendulum
 
 from airflow.configuration import conf
@@ -78,8 +79,6 @@ def _set_task_deferred_context_var():
 
 
 def _fetch_logs_from_service(url, log_relative_path):
-    import httpx
-
     from airflow.utils.jwt_signer import JWTSigner
 
     timeout = conf.getint("webserver", "log_fetch_timeout_sec", fallback=None)
@@ -170,6 +169,9 @@ class FileTaskHandler(logging.Handler):
     """
 
     trigger_should_wrap = True
+    inherits_from_empty_operator_log_message = (
+        "Operator inherits from empty operator and thus does not have logs"
+    )
 
     def __init__(self, base_log_folder: str, filename_template: str | None = 
None):
         super().__init__()
@@ -555,8 +557,11 @@ class FileTaskHandler(logging.Handler):
                 messages.append(f"Found logs served from host {url}")
                 logs.append(response.text)
         except Exception as e:
-            messages.append(f"Could not read served logs: {e}")
-            logger.exception("Could not read served logs")
+            if isinstance(e, httpx.UnsupportedProtocol) and 
ti.task.inherits_from_empty_operator is True:
+                messages.append(self.inherits_from_empty_operator_log_message)
+            else:
+                messages.append(f"Could not read served logs: {e}")
+                logger.exception("Could not read served logs")
         return messages, logs
 
     def _read_remote_logs(self, ti, try_number, metadata=None) -> 
tuple[list[str], list[str]]:

Reply via email to