This is an automated email from the ASF dual-hosted git repository.
jasonliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 27240e68de9 Add stream method to RemoteIO (#54813)
27240e68de9 is described below
commit 27240e68de90c631c98277a22e8da4c6739af3e9
Author: LIU ZHE YOU <[email protected]>
AuthorDate: Wed Nov 12 11:06:31 2025 +0800
Add stream method to RemoteIO (#54813)
* Add stream method to RemoteIO
* Import BaseOperator from version_compact
* Skip Airflow 2 compact test for RemoteIO
* Add a new Protocal for .stream method, check attr in runtime
* Remove NotImplementedError
* Clearify naming for log response types
* Fix comment for LogMessages
* Try to make mypy happy
* Remove redundant tests
* Check hasattr before getattr
* Fix nits for using getattr
---
.../config_templates/airflow_local_settings.py | 4 ++--
airflow-core/src/airflow/logging/remote.py | 15 ++++++++++++---
.../src/airflow/utils/log/file_task_handler.py | 20 ++++++++++++--------
.../unit/alibaba/cloud/log/test_oss_task_handler.py | 2 +-
4 files changed, 27 insertions(+), 14 deletions(-)
diff --git
a/airflow-core/src/airflow/config_templates/airflow_local_settings.py
b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
index 6270fba7ba3..e658531fcde 100644
--- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py
+++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
@@ -27,7 +27,7 @@ from airflow.configuration import conf
from airflow.exceptions import AirflowException
if TYPE_CHECKING:
- from airflow.logging_config import RemoteLogIO
+ from airflow.logging_config import RemoteLogIO, RemoteLogStreamIO
LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper()
@@ -119,7 +119,7 @@ if EXTRA_LOGGER_NAMES:
##################
REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging")
-REMOTE_TASK_LOG: RemoteLogIO | None = None
+REMOTE_TASK_LOG: RemoteLogIO | RemoteLogStreamIO | None = None
DEFAULT_REMOTE_CONN_ID: str | None = None
diff --git a/airflow-core/src/airflow/logging/remote.py
b/airflow-core/src/airflow/logging/remote.py
index ec5ab70ab20..5f60fc6ffff 100644
--- a/airflow-core/src/airflow/logging/remote.py
+++ b/airflow-core/src/airflow/logging/remote.py
@@ -18,13 +18,13 @@
from __future__ import annotations
import os
-from typing import TYPE_CHECKING, Protocol
+from typing import TYPE_CHECKING, Protocol, runtime_checkable
if TYPE_CHECKING:
import structlog.typing
from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
- from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo
+ from airflow.utils.log.file_task_handler import LogResponse,
StreamingLogResponse
class RemoteLogIO(Protocol):
@@ -44,6 +44,15 @@ class RemoteLogIO(Protocol):
"""Upload the given log path to the remote storage."""
...
- def read(self, relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo,
LogMessages | None]:
+ def read(self, relative_path: str, ti: RuntimeTI) -> LogResponse:
"""Read logs from the given remote log path."""
...
+
+
+@runtime_checkable
+class RemoteLogStreamIO(RemoteLogIO, Protocol):
+ """Interface for remote task loggers with stream-based read support."""
+
+ def stream(self, relative_path: str, ti: RuntimeTI) ->
StreamingLogResponse:
+ """Stream-based read interface for reading logs from the given remote
log path."""
+ ...
diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py
b/airflow-core/src/airflow/utils/log/file_task_handler.py
index 606c7369a1a..44ccf2cde38 100644
--- a/airflow-core/src/airflow/utils/log/file_task_handler.py
+++ b/airflow-core/src/airflow/utils/log/file_task_handler.py
@@ -68,16 +68,15 @@ HALF_HEAP_DUMP_SIZE = HEAP_DUMP_SIZE // 2
# These types are similar, but have distinct names to make processing them
less error prone
LogMessages: TypeAlias = list[str]
-"""The legacy format of log messages before 3.0.2"""
+"""The legacy format of log messages before 3.0.4"""
LogSourceInfo: TypeAlias = list[str]
"""Information _about_ the log fetching process for display to a user"""
RawLogStream: TypeAlias = Generator[str, None, None]
"""Raw log stream, containing unparsed log lines."""
-LegacyLogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages]
+LogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages | None]
"""Legacy log response, containing source information and log messages."""
-LogResponse: TypeAlias = tuple[LogSourceInfo, list[RawLogStream]]
-LogResponseWithSize: TypeAlias = tuple[LogSourceInfo, list[RawLogStream], int]
-"""Log response, containing source information, stream of log lines, and total
log size."""
+StreamingLogResponse: TypeAlias = tuple[LogSourceInfo, list[RawLogStream]]
+"""Streaming log response, containing source information, stream of log
lines."""
StructuredLogStream: TypeAlias = Generator["StructuredLogMessage", None, None]
"""Structured log stream, containing structured log messages."""
LogHandlerOutputStream: TypeAlias = (
@@ -856,7 +855,7 @@ class FileTaskHandler(logging.Handler):
@staticmethod
def _read_from_local(
worker_log_path: Path,
- ) -> LogResponse:
+ ) -> StreamingLogResponse:
sources: LogSourceInfo = []
log_streams: list[RawLogStream] = []
paths = sorted(worker_log_path.parent.glob(worker_log_path.name + "*"))
@@ -873,7 +872,7 @@ class FileTaskHandler(logging.Handler):
self,
ti: TaskInstance | TaskInstanceHistory,
worker_log_rel_path: str,
- ) -> LogResponse:
+ ) -> StreamingLogResponse:
sources: LogSourceInfo = []
log_streams: list[RawLogStream] = []
try:
@@ -911,7 +910,7 @@ class FileTaskHandler(logging.Handler):
logger.exception("Could not read served logs")
return sources, log_streams
- def _read_remote_logs(self, ti, try_number, metadata=None) ->
LegacyLogResponse | LogResponse:
+ def _read_remote_logs(self, ti, try_number, metadata=None) -> LogResponse
| StreamingLogResponse:
"""
Implement in subclasses to read from the remote service.
@@ -936,5 +935,10 @@ class FileTaskHandler(logging.Handler):
# This living here is not really a good plan, but it just about works
for now.
# Ideally we move all the read+combine logic in to TaskLogReader and
out of the task handler.
path = self._render_filename(ti, try_number)
+ if stream_method := getattr(remote_io, "stream", None):
+ # Use .stream interface if provider's RemoteIO supports it
+ sources, logs = stream_method(path, ti)
+ return sources, logs or []
+ # Fallback to .read interface
sources, logs = remote_io.read(path, ti)
return sources, logs or []
diff --git
a/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py
b/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py
index 56c65a7c7d7..1fcc9be4f89 100644
--- a/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py
+++ b/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py
@@ -24,7 +24,7 @@ from unittest.mock import PropertyMock
import pytest
-from airflow.providers.alibaba.cloud.log.oss_task_handler import
OSSRemoteLogIO, OSSTaskHandler # noqa: F401
+from airflow.providers.alibaba.cloud.log.oss_task_handler import OSSTaskHandler
from airflow.utils.state import TaskInstanceState
from airflow.utils.timezone import datetime