ashb commented on code in PR #48788: URL: https://github.com/apache/airflow/pull/48788#discussion_r2036299087
########## providers/apache/hdfs/src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py: ########## @@ -17,45 +17,102 @@ # under the License. from __future__ import annotations +import logging import os -import pathlib import shutil from functools import cached_property +from pathlib import Path +from typing import TYPE_CHECKING from urllib.parse import urlsplit +import attrs + from airflow.configuration import conf from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin +if TYPE_CHECKING: + from airflow.models.taskinstance import TaskInstance + from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI + from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo + + [email protected](kw_only=True) +class HdfsRemoteLogIO(LoggingMixin): # noqa: D101 + remote_base: str + base_log_folder: Path = attrs.field(converter=Path) + delete_local_copy: bool + + processors = () + + def upload(self, path: os.PathLike | str, ti: RuntimeTI): + """Upload the given log path to the remote storage.""" + path = Path(path) + if path.is_absolute(): + local_loc = path + remote_loc = os.path.join(self.remote_base, path.relative_to(self.base_log_folder)) + else: + local_loc = self.base_log_folder.joinpath(path) + remote_loc = os.path.join(self.remote_base, path) + + if local_loc.is_file(): + self.hook.load_file(local_loc, remote_loc) + if self.delete_local_copy: + shutil.rmtree(os.path.dirname(local_loc)) + + @cached_property + def hook(self): + """Returns WebHDFSHook.""" + return WebHDFSHook(webhdfs_conn_id=conf.get("logging", "REMOTE_LOG_CONN_ID")) + + def read(self, relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages]: + logs = [] + messages = [] + file_path = os.path.join(self.remote_base, relative_path) + if self.hook.check_for_path(file_path): + logs.append(self.hook.read_file(file_path).decode("utf-8")) + else: + messages.append(f"No logs found on hdfs for ti={ti}") + return messages, logs + class HdfsTaskHandler(FileTaskHandler, LoggingMixin): - """Logging handler to upload and read from HDFS.""" + """ + HdfsTaskHandler is a Python logging handler that handles and reads task instance logs. + + It extends airflow FileTaskHandler and uploads to and reads from HDFS. + """ def __init__(self, base_log_folder: str, hdfs_log_folder: str, **kwargs): super().__init__(base_log_folder) + self.handler: logging.FileHandler | None = None self.remote_base = urlsplit(hdfs_log_folder).path self.log_relative_path = "" self._hook = None self.closed = False self.upload_on_close = True - self.delete_local_copy = kwargs.get( - "delete_local_copy", conf.getboolean("logging", "delete_local_logs") - ) - @cached_property - def hook(self): - """Returns WebHDFSHook.""" - return WebHDFSHook(webhdfs_conn_id=conf.get("logging", "REMOTE_LOG_CONN_ID")) + self.io = HdfsRemoteLogIO( + remote_base=hdfs_log_folder, + base_log_folder=base_log_folder, + delete_local_copy=kwargs.get( + "delete_local_copy", conf.getboolean("logging", "delete_local_logs") + ), + ) - def set_context(self, ti): + def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None: Review Comment: Why is `identifer` added here? Nothing ever calls it with it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
