This is an automated email from the ASF dual-hosted git repository.

jasonliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 27240e68de9 Add stream method to RemoteIO (#54813)
27240e68de9 is described below

commit 27240e68de90c631c98277a22e8da4c6739af3e9
Author: LIU ZHE YOU <[email protected]>
AuthorDate: Wed Nov 12 11:06:31 2025 +0800

    Add stream method to RemoteIO (#54813)
    
    * Add stream method to RemoteIO
    
    * Import BaseOperator from version_compact
    
    * Skip Airflow 2 compact test for RemoteIO
    
    * Add a new Protocal for .stream method, check attr in runtime
    
    * Remove NotImplementedError
    
    * Clearify naming for log response types
    
    * Fix comment for LogMessages
    
    * Try to make mypy happy
    
    * Remove redundant tests
    
    * Check hasattr before getattr
    
    * Fix nits for using getattr
---
 .../config_templates/airflow_local_settings.py       |  4 ++--
 airflow-core/src/airflow/logging/remote.py           | 15 ++++++++++++---
 .../src/airflow/utils/log/file_task_handler.py       | 20 ++++++++++++--------
 .../unit/alibaba/cloud/log/test_oss_task_handler.py  |  2 +-
 4 files changed, 27 insertions(+), 14 deletions(-)

diff --git 
a/airflow-core/src/airflow/config_templates/airflow_local_settings.py 
b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
index 6270fba7ba3..e658531fcde 100644
--- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py
+++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
@@ -27,7 +27,7 @@ from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.logging_config import RemoteLogIO
+    from airflow.logging_config import RemoteLogIO, RemoteLogStreamIO
 
 LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper()
 
@@ -119,7 +119,7 @@ if EXTRA_LOGGER_NAMES:
 ##################
 
 REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging")
-REMOTE_TASK_LOG: RemoteLogIO | None = None
+REMOTE_TASK_LOG: RemoteLogIO | RemoteLogStreamIO | None = None
 DEFAULT_REMOTE_CONN_ID: str | None = None
 
 
diff --git a/airflow-core/src/airflow/logging/remote.py 
b/airflow-core/src/airflow/logging/remote.py
index ec5ab70ab20..5f60fc6ffff 100644
--- a/airflow-core/src/airflow/logging/remote.py
+++ b/airflow-core/src/airflow/logging/remote.py
@@ -18,13 +18,13 @@
 from __future__ import annotations
 
 import os
-from typing import TYPE_CHECKING, Protocol
+from typing import TYPE_CHECKING, Protocol, runtime_checkable
 
 if TYPE_CHECKING:
     import structlog.typing
 
     from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
-    from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo
+    from airflow.utils.log.file_task_handler import LogResponse, 
StreamingLogResponse
 
 
 class RemoteLogIO(Protocol):
@@ -44,6 +44,15 @@ class RemoteLogIO(Protocol):
         """Upload the given log path to the remote storage."""
         ...
 
-    def read(self, relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, 
LogMessages | None]:
+    def read(self, relative_path: str, ti: RuntimeTI) -> LogResponse:
         """Read logs from the given remote log path."""
         ...
+
+
+@runtime_checkable
+class RemoteLogStreamIO(RemoteLogIO, Protocol):
+    """Interface for remote task loggers with stream-based read support."""
+
+    def stream(self, relative_path: str, ti: RuntimeTI) -> 
StreamingLogResponse:
+        """Stream-based read interface for reading logs from the given remote 
log path."""
+        ...
diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py 
b/airflow-core/src/airflow/utils/log/file_task_handler.py
index 606c7369a1a..44ccf2cde38 100644
--- a/airflow-core/src/airflow/utils/log/file_task_handler.py
+++ b/airflow-core/src/airflow/utils/log/file_task_handler.py
@@ -68,16 +68,15 @@ HALF_HEAP_DUMP_SIZE = HEAP_DUMP_SIZE // 2
 
 # These types are similar, but have distinct names to make processing them 
less error prone
 LogMessages: TypeAlias = list[str]
-"""The legacy format of log messages before 3.0.2"""
+"""The legacy format of log messages before 3.0.4"""
 LogSourceInfo: TypeAlias = list[str]
 """Information _about_ the log fetching process for display to a user"""
 RawLogStream: TypeAlias = Generator[str, None, None]
 """Raw log stream, containing unparsed log lines."""
-LegacyLogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages]
+LogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages | None]
 """Legacy log response, containing source information and log messages."""
-LogResponse: TypeAlias = tuple[LogSourceInfo, list[RawLogStream]]
-LogResponseWithSize: TypeAlias = tuple[LogSourceInfo, list[RawLogStream], int]
-"""Log response, containing source information, stream of log lines, and total 
log size."""
+StreamingLogResponse: TypeAlias = tuple[LogSourceInfo, list[RawLogStream]]
+"""Streaming log response, containing source information, stream of log 
lines."""
 StructuredLogStream: TypeAlias = Generator["StructuredLogMessage", None, None]
 """Structured log stream, containing structured log messages."""
 LogHandlerOutputStream: TypeAlias = (
@@ -856,7 +855,7 @@ class FileTaskHandler(logging.Handler):
     @staticmethod
     def _read_from_local(
         worker_log_path: Path,
-    ) -> LogResponse:
+    ) -> StreamingLogResponse:
         sources: LogSourceInfo = []
         log_streams: list[RawLogStream] = []
         paths = sorted(worker_log_path.parent.glob(worker_log_path.name + "*"))
@@ -873,7 +872,7 @@ class FileTaskHandler(logging.Handler):
         self,
         ti: TaskInstance | TaskInstanceHistory,
         worker_log_rel_path: str,
-    ) -> LogResponse:
+    ) -> StreamingLogResponse:
         sources: LogSourceInfo = []
         log_streams: list[RawLogStream] = []
         try:
@@ -911,7 +910,7 @@ class FileTaskHandler(logging.Handler):
                 logger.exception("Could not read served logs")
         return sources, log_streams
 
-    def _read_remote_logs(self, ti, try_number, metadata=None) -> 
LegacyLogResponse | LogResponse:
+    def _read_remote_logs(self, ti, try_number, metadata=None) -> LogResponse 
| StreamingLogResponse:
         """
         Implement in subclasses to read from the remote service.
 
@@ -936,5 +935,10 @@ class FileTaskHandler(logging.Handler):
         # This living here is not really a good plan, but it just about works 
for now.
         # Ideally we move all the read+combine logic in to TaskLogReader and 
out of the task handler.
         path = self._render_filename(ti, try_number)
+        if stream_method := getattr(remote_io, "stream", None):
+            # Use .stream interface if provider's RemoteIO supports it
+            sources, logs = stream_method(path, ti)
+            return sources, logs or []
+        # Fallback to .read interface
         sources, logs = remote_io.read(path, ti)
         return sources, logs or []
diff --git 
a/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py 
b/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py
index 56c65a7c7d7..1fcc9be4f89 100644
--- a/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py
+++ b/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py
@@ -24,7 +24,7 @@ from unittest.mock import PropertyMock
 
 import pytest
 
-from airflow.providers.alibaba.cloud.log.oss_task_handler import 
OSSRemoteLogIO, OSSTaskHandler  # noqa: F401
+from airflow.providers.alibaba.cloud.log.oss_task_handler import OSSTaskHandler
 from airflow.utils.state import TaskInstanceState
 from airflow.utils.timezone import datetime
 

Reply via email to