Lee-W commented on code in PR #45129:
URL: https://github.com/apache/airflow/pull/45129#discussion_r1957799020


##########
airflow/utils/log/file_task_handler.py:
##########
@@ -159,6 +264,70 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance, 
session) -> TaskInstance:
     return val
 
 
+def _get_compatible_parse_log_streams(remote_logs: list[str]) -> 
list[_ParsedLogStreamType]:
+    """
+    Compatible utility for new log reading(stream-based + k-way merge log) and 
old log reading(read whole log in memory + sorting).
+
+    Turn old log reading into new stream-based log reading.
+    Will be removed after all providers adapt to stream-based log reading.
+
+    :param remote_logs: list of log lines
+    :return: parsed log streams if remote_logs is not empty, otherwise empty 
list
+    """
+    if not remote_logs:
+        # empty remote logs
+        return []
+
+    def _parse_log(logs: list[str]):
+        timestamp = None
+        next_timestamp = None
+        for line_num, line in enumerate(logs):
+            with suppress(Exception):
+                # next_timestamp unchanged if line can't be parsed
+                next_timestamp = _parse_timestamp(line)
+            if next_timestamp:
+                timestamp = next_timestamp
+            yield timestamp, line_num, line
+
+    return [_parse_log(remote_logs)]
+
+
+def _get_compatible_read_for_providers(read_response: tuple) -> 
tuple[Iterable[str], dict[str, Any]]:
+    """
+    Compatible utility for transforming `_read` method return value for 
providers.
+
+    Providers methods return type might be:
+    - `tuple[str,dict[str,Any]]`
+        - alibaba.cloud.log.oss_task_handler.OssTaskHandler
+        - amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler
+        - redis.log.redis_task_handler.RedisTaskHandler
+    - `tuple[list[tuple[str,str]],dict[str,Any]]` ( 
tuple[list[host,log_documents],metadata] )
+        - For this case, we need to split host and log_documents and put host 
into metadata
+        - elasticsearch.log.es_task_handler.ElasticsearchTaskHandler
+        - opensearch.log.os_task_handler.OpenSearchTaskHandler
+    """
+    if len(read_response) != 2:
+        raise ValueError("Unexpected return value from _read")
+    # for tuple[str,dict[str,Any]]
+    if isinstance(read_response[0], str):
+        log_str, metadata = read_response
+        return (log_str.splitlines(), metadata)
+
+    # for tuple[list[tuple[str,str]],dict[str,Any]]
+    if isinstance(read_response[0], list):
+        host_by_logs, metadata = read_response
+        if len(host_by_logs) > 0:

Review Comment:
   ```suggestion
           if len(host_by_logs):
   ```



##########
airflow/executors/base_executor.py:
##########
@@ -569,13 +569,20 @@ def execute_async(
         """
         raise NotImplementedError()
 
-    def get_task_log(self, ti: TaskInstance, try_number: int) -> 
tuple[list[str], list[str]]:
+    def get_task_log(
+        self, ti: TaskInstance, try_number: int
+    ) -> (
+        tuple[list[str], list[Generator[tuple[pendulum.DateTime | None, int, 
str], None, None]], int]
+        | tuple[list[str], list[str]]

Review Comment:
   Should we make it a new type? TBH, this is a bit hard to read 🤔



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -159,6 +264,70 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance, 
session) -> TaskInstance:
     return val
 
 
+def _get_compatible_parse_log_streams(remote_logs: list[str]) -> 
list[_ParsedLogStreamType]:
+    """
+    Compatible utility for new log reading(stream-based + k-way merge log) and 
old log reading(read whole log in memory + sorting).
+
+    Turn old log reading into new stream-based log reading.
+    Will be removed after all providers adapt to stream-based log reading.
+
+    :param remote_logs: list of log lines
+    :return: parsed log streams if remote_logs is not empty, otherwise empty 
list
+    """
+    if not remote_logs:
+        # empty remote logs
+        return []
+
+    def _parse_log(logs: list[str]):

Review Comment:
   miss return type



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -439,43 +643,60 @@ def _get_log_retrieval_url(
             log_relative_path,
         )
 
-    def read(self, task_instance, try_number=None, metadata=None):
+    def read(
+        self, task_instance, try_number=None, metadata=None
+    ) -> tuple[list[str], list[Generator[str, None, None]], list[dict[str, 
Any]]]:
         """
         Read logs of given task instance from local machine.
 
         :param task_instance: task instance object
         :param try_number: task instance try_number to read logs from. If None
                            it returns all logs separated by try_number
         :param metadata: log metadata, can be used for steaming log reading 
and auto-tailing.
-        :return: a list of listed tuples which order log string by host
+        :return: tuple of hosts, log streams, and metadata_array
         """
         # Task instance increments its try number when it starts to run.
         # So the log for a particular task try will only show up when
         # try number gets incremented in DB, i.e logs produced the time
         # after cli run and before try_number + 1 in DB will not be displayed.
+        try_numbers: list
         if try_number is None:
             next_try = task_instance.try_number + 1
             try_numbers = list(range(1, next_try))
         elif try_number < 1:
-            logs = [
-                [("default_host", f"Error fetching the logs. Try number 
{try_number} is invalid.")],
-            ]
-            return logs, [{"end_of_log": True}]
+            error_logs = [(log for log in [f"Error fetching the logs. Try 
number {try_number} is invalid."])]
+            return ["default_host"], error_logs, [{"end_of_log": True}]
         else:
             try_numbers = [try_number]
 
-        logs = [""] * len(try_numbers)
-        metadata_array = [{}] * len(try_numbers)
+        hosts = [""] * len(try_numbers)

Review Comment:
   if `len(try_numbers)` will not change, we can assign it to a variable



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -439,43 +643,60 @@ def _get_log_retrieval_url(
             log_relative_path,
         )
 
-    def read(self, task_instance, try_number=None, metadata=None):
+    def read(
+        self, task_instance, try_number=None, metadata=None

Review Comment:
   ```suggestion
           self, task_instance: TaskInstance, try_number: int | None=None, 
metadata metadata: dict[str, Any] | None = None
   ```



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -352,47 +521,64 @@ def _read(
         # is needed to get correct log path.
         worker_log_rel_path = self._render_filename(ti, try_number)
         messages_list: list[str] = []
-        remote_logs: list[str] = []
-        local_logs: list[str] = []
+        remote_parsed_logs: list[_ParsedLogStreamType] = []
+        remote_logs_size = 0

Review Comment:
   some of the following logic could be methods of this created class



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -402,11 +588,29 @@ def _read(
             TaskInstanceState.RUNNING,
             TaskInstanceState.DEFERRED,
         )
+
+        current_total_logs_size = local_logs_size + remote_logs_size + 
executor_logs_size + served_logs_size
+        interleave_log_stream = _interleave_logs(
+            *local_parsed_logs,
+            *remote_parsed_logs,
+            *(executor_parsed_logs or []),
+            *served_parsed_logs,
+        )
+
+        # skip log stream until the last position
         if metadata and "log_pos" in metadata:
-            previous_chars = metadata["log_pos"]
-            logs = logs[previous_chars:]  # Cut off previously passed log test 
as new tail
-        out_message = logs if "log_pos" in (metadata or {}) else messages + 
logs
-        return out_message, {"end_of_log": end_of_log, "log_pos": log_pos}
+            offset = metadata["log_pos"]
+            for _ in range(offset):
+                next(interleave_log_stream, None)
+
+        out_stream: Iterable[str]
+        if "log_pos" in (metadata or {}):

Review Comment:
   ```suggestion
           if metadata and "log_pos" in metadata:
   ```
   
   The original logic achieve the same result, but probably is not the correct 
logic 🤔 



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -107,30 +131,111 @@ def _parse_timestamp(line: str):
         return pendulum.parse(timestamp_str.strip("[]"))
 
 
-def _parse_timestamps_in_log_file(lines: Iterable[str]):
-    timestamp = None
-    next_timestamp = None
-    for idx, line in enumerate(lines):
-        if line:
-            with suppress(Exception):
-                # next_timestamp unchanged if line can't be parsed
-                next_timestamp = _parse_timestamp(line)
-            if next_timestamp:
-                timestamp = next_timestamp
-            yield timestamp, idx, line
+def _get_parsed_log_stream(file_path: Path) -> _ParsedLogStreamType:
+    with open(file_path) as f:
+        line_num = 0  # line number for each log line
+        for file_chunk in iter(partial(f.read, CHUNK_SIZE), b""):
+            if not file_chunk:
+                break
+            # parse log lines
+            lines = file_chunk.splitlines()
+            timestamp = None
+            next_timestamp = None
+            for line in lines:
+                if line:

Review Comment:
   Just to make sure. This is to skip empty line



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -541,18 +762,51 @@ def _init_file(self, ti, *, identifier: str | None = 
None):
         return full_path
 
     @staticmethod
-    def _read_from_local(worker_log_path: Path) -> tuple[list[str], list[str]]:
-        messages = []
+    def _read_from_local(worker_log_path: Path) -> _LogSourceType:
+        """
+        Read logs from local file.
+
+        :param worker_log_path: Path to the worker log file
+        :return: Tuple of messages, log streams, total size of logs
+        """
+        total_log_size: int = 0
+        messages: list[str] = []
+        parsed_log_streams: list[_ParsedLogStreamType] = []
         paths = sorted(worker_log_path.parent.glob(worker_log_path.name + "*"))
-        if paths:
-            messages.append("Found local files:")
-            messages.extend(f"  * {x}" for x in paths)
-        logs = [file.read_text() for file in paths]
-        return messages, logs
-
-    def _read_from_logs_server(self, ti, worker_log_rel_path) -> 
tuple[list[str], list[str]]:
-        messages = []
-        logs = []
+        if not paths:
+            return messages, parsed_log_streams, total_log_size
+
+        messages.append("Found local files:")
+        for path in paths:
+            total_log_size += path.stat().st_size
+            messages.append(f"  * {path}")
+            parsed_log_streams.append(_get_parsed_log_stream(path))
+
+        return messages, parsed_log_streams, total_log_size
+
+    def _read_from_logs_server(self, ti, worker_log_rel_path) -> 
_LogSourceType:

Review Comment:
   missing type annotation



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -352,47 +521,64 @@ def _read(
         # is needed to get correct log path.
         worker_log_rel_path = self._render_filename(ti, try_number)
         messages_list: list[str] = []
-        remote_logs: list[str] = []
-        local_logs: list[str] = []
+        remote_parsed_logs: list[_ParsedLogStreamType] = []
+        remote_logs_size = 0

Review Comment:
   not sure whether we want to make these attrs, or dataclass



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -541,18 +762,51 @@ def _init_file(self, ti, *, identifier: str | None = 
None):
         return full_path
 
     @staticmethod
-    def _read_from_local(worker_log_path: Path) -> tuple[list[str], list[str]]:
-        messages = []
+    def _read_from_local(worker_log_path: Path) -> _LogSourceType:
+        """
+        Read logs from local file.
+
+        :param worker_log_path: Path to the worker log file
+        :return: Tuple of messages, log streams, total size of logs
+        """
+        total_log_size: int = 0
+        messages: list[str] = []
+        parsed_log_streams: list[_ParsedLogStreamType] = []
         paths = sorted(worker_log_path.parent.glob(worker_log_path.name + "*"))
-        if paths:
-            messages.append("Found local files:")
-            messages.extend(f"  * {x}" for x in paths)
-        logs = [file.read_text() for file in paths]
-        return messages, logs
-
-    def _read_from_logs_server(self, ti, worker_log_rel_path) -> 
tuple[list[str], list[str]]:
-        messages = []
-        logs = []
+        if not paths:
+            return messages, parsed_log_streams, total_log_size
+
+        messages.append("Found local files:")
+        for path in paths:
+            total_log_size += path.stat().st_size
+            messages.append(f"  * {path}")
+            parsed_log_streams.append(_get_parsed_log_stream(path))
+
+        return messages, parsed_log_streams, total_log_size
+
+    def _read_from_logs_server(self, ti, worker_log_rel_path) -> 
_LogSourceType:
+        total_log_size: int = 0
+        messages: list[str] = []
+        parsed_log_streams: list[_ParsedLogStreamType] = []
+
+        def _get_parsed_log_stream_from_response(response):

Review Comment:
   missing type annotation



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -159,6 +264,70 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance, 
session) -> TaskInstance:
     return val
 
 
+def _get_compatible_parse_log_streams(remote_logs: list[str]) -> 
list[_ParsedLogStreamType]:
+    """
+    Compatible utility for new log reading(stream-based + k-way merge log) and 
old log reading(read whole log in memory + sorting).
+
+    Turn old log reading into new stream-based log reading.
+    Will be removed after all providers adapt to stream-based log reading.
+
+    :param remote_logs: list of log lines
+    :return: parsed log streams if remote_logs is not empty, otherwise empty 
list
+    """
+    if not remote_logs:
+        # empty remote logs
+        return []
+
+    def _parse_log(logs: list[str]):
+        timestamp = None
+        next_timestamp = None
+        for line_num, line in enumerate(logs):
+            with suppress(Exception):
+                # next_timestamp unchanged if line can't be parsed
+                next_timestamp = _parse_timestamp(line)
+            if next_timestamp:
+                timestamp = next_timestamp
+            yield timestamp, line_num, line
+
+    return [_parse_log(remote_logs)]
+
+
+def _get_compatible_read_for_providers(read_response: tuple) -> 
tuple[Iterable[str], dict[str, Any]]:
+    """
+    Compatible utility for transforming `_read` method return value for 
providers.
+
+    Providers methods return type might be:
+    - `tuple[str,dict[str,Any]]`
+        - alibaba.cloud.log.oss_task_handler.OssTaskHandler
+        - amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler
+        - redis.log.redis_task_handler.RedisTaskHandler
+    - `tuple[list[tuple[str,str]],dict[str,Any]]` ( 
tuple[list[host,log_documents],metadata] )
+        - For this case, we need to split host and log_documents and put host 
into metadata
+        - elasticsearch.log.es_task_handler.ElasticsearchTaskHandler
+        - opensearch.log.os_task_handler.OpenSearchTaskHandler
+    """
+    if len(read_response) != 2:
+        raise ValueError("Unexpected return value from _read")
+    # for tuple[str,dict[str,Any]]
+    if isinstance(read_response[0], str):
+        log_str, metadata = read_response
+        return (log_str.splitlines(), metadata)
+
+    # for tuple[list[tuple[str,str]],dict[str,Any]]
+    if isinstance(read_response[0], list):
+        host_by_logs, metadata = read_response
+        if len(host_by_logs) > 0:
+            metadata["host"] = host_by_logs[0][0]
+
+        def _host_by_logs_to_log_stream(host_by_logs):

Review Comment:
   ```suggestion
           def _host_by_logs_to_log_stream(host_by_logs: list[str) -> str:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to