ashb commented on code in PR #48788:
URL: https://github.com/apache/airflow/pull/48788#discussion_r2036299087


##########
providers/apache/hdfs/src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py:
##########
@@ -17,45 +17,102 @@
 # under the License.
 from __future__ import annotations
 
+import logging
 import os
-import pathlib
 import shutil
 from functools import cached_property
+from pathlib import Path
+from typing import TYPE_CHECKING
 from urllib.parse import urlsplit
 
+import attrs
+
 from airflow.configuration import conf
 from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
 from airflow.utils.log.file_task_handler import FileTaskHandler
 from airflow.utils.log.logging_mixin import LoggingMixin
 
+if TYPE_CHECKING:
+    from airflow.models.taskinstance import TaskInstance
+    from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
+    from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo
+
+
[email protected](kw_only=True)
+class HdfsRemoteLogIO(LoggingMixin):  # noqa: D101
+    remote_base: str
+    base_log_folder: Path = attrs.field(converter=Path)
+    delete_local_copy: bool
+
+    processors = ()
+
+    def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+        """Upload the given log path to the remote storage."""
+        path = Path(path)
+        if path.is_absolute():
+            local_loc = path
+            remote_loc = os.path.join(self.remote_base, 
path.relative_to(self.base_log_folder))
+        else:
+            local_loc = self.base_log_folder.joinpath(path)
+            remote_loc = os.path.join(self.remote_base, path)
+
+        if local_loc.is_file():
+            self.hook.load_file(local_loc, remote_loc)
+            if self.delete_local_copy:
+                shutil.rmtree(os.path.dirname(local_loc))
+
+    @cached_property
+    def hook(self):
+        """Returns WebHDFSHook."""
+        return WebHDFSHook(webhdfs_conn_id=conf.get("logging", 
"REMOTE_LOG_CONN_ID"))
+
+    def read(self, relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, 
LogMessages]:
+        logs = []
+        messages = []
+        file_path = os.path.join(self.remote_base, relative_path)
+        if self.hook.check_for_path(file_path):
+            logs.append(self.hook.read_file(file_path).decode("utf-8"))
+        else:
+            messages.append(f"No logs found on hdfs for ti={ti}")
+        return messages, logs
+
 
 class HdfsTaskHandler(FileTaskHandler, LoggingMixin):
-    """Logging handler to upload and read from HDFS."""
+    """
+    HdfsTaskHandler is a Python logging handler that handles and reads task 
instance logs.
+
+    It extends airflow FileTaskHandler and uploads to and reads from HDFS.
+    """
 
     def __init__(self, base_log_folder: str, hdfs_log_folder: str, **kwargs):
         super().__init__(base_log_folder)
+        self.handler: logging.FileHandler | None = None
         self.remote_base = urlsplit(hdfs_log_folder).path
         self.log_relative_path = ""
         self._hook = None
         self.closed = False
         self.upload_on_close = True
-        self.delete_local_copy = kwargs.get(
-            "delete_local_copy", conf.getboolean("logging", 
"delete_local_logs")
-        )
 
-    @cached_property
-    def hook(self):
-        """Returns WebHDFSHook."""
-        return WebHDFSHook(webhdfs_conn_id=conf.get("logging", 
"REMOTE_LOG_CONN_ID"))
+        self.io = HdfsRemoteLogIO(
+            remote_base=hdfs_log_folder,
+            base_log_folder=base_log_folder,
+            delete_local_copy=kwargs.get(
+                "delete_local_copy", conf.getboolean("logging", 
"delete_local_logs")
+            ),
+        )
 
-    def set_context(self, ti):
+    def set_context(self, ti: TaskInstance, *, identifier: str | None = None) 
-> None:

Review Comment:
   Why is `identifer` added here? Nothing ever calls it with it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to