jason810496 commented on code in PR #49470:
URL: https://github.com/apache/airflow/pull/49470#discussion_r2160346872


##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -19,51 +19,111 @@
 
 from __future__ import annotations
 
-import itertools
+import heapq
+import io
 import logging
 import os
-from collections.abc import Iterable
+from collections.abc import Generator, Iterator
 from contextlib import suppress
 from datetime import datetime
 from enum import Enum
+from itertools import chain
 from pathlib import Path
-from typing import TYPE_CHECKING, Any, Callable, Union
+from types import GeneratorType
+from typing import IO, TYPE_CHECKING, Callable, Optional, TypedDict, Union, 
cast
 from urllib.parse import urljoin
 
 import pendulum
 from pydantic import BaseModel, ConfigDict, ValidationError
+from typing_extensions import NotRequired
 
 from airflow.configuration import conf
 from airflow.executors.executor_loader import ExecutorLoader
 from airflow.utils.helpers import parse_template_string, render_template
+from airflow.utils.log.log_stream_accumulator import LogStreamAccumulator
 from airflow.utils.log.logging_mixin import SetContextPropagate
 from airflow.utils.log.non_caching_file_handler import 
NonCachingRotatingFileHandler
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.state import State, TaskInstanceState
 
 if TYPE_CHECKING:
+    from requests import Response
+
     from airflow.executors.base_executor import BaseExecutor
     from airflow.models.taskinstance import TaskInstance
     from airflow.models.taskinstancehistory import TaskInstanceHistory
     from airflow.typing_compat import TypeAlias
 
+CHUNK_SIZE = 1024 * 1024 * 5  # 5MB
+DEFAULT_SORT_DATETIME = pendulum.datetime(2000, 1, 1)
+DEFAULT_SORT_TIMESTAMP = int(DEFAULT_SORT_DATETIME.timestamp() * 1000)
+SORT_KEY_OFFSET = 10000000
+"""An offset used by the _create_sort_key utility.
+
+Assuming 50 characters per line, an offset of 10,000,000 can represent 
approximately 500 MB of file data, which is sufficient for use as a constant.
+"""
+HEAP_DUMP_SIZE = 5000
+HALF_HEAP_DUMP_SIZE = HEAP_DUMP_SIZE // 2
 
 # These types are similar, but have distinct names to make processing them 
less error prone
-LogMessages: TypeAlias = Union[list["StructuredLogMessage"], list[str]]
-"""The log messages themselves, either in already sturcutured form, or a 
single string blob to be parsed later"""
+LogMessages: TypeAlias = list[str]
+"""The legacy format of log messages, represented as a single string blob to 
be parsed later."""
 LogSourceInfo: TypeAlias = list[str]
 """Information _about_ the log fetching process for display to a user"""
-LogMetadata: TypeAlias = dict[str, Any]
+RawLogStream: TypeAlias = Generator[str, None, None]
+"""Raw log stream, containing unparsed log lines."""
+LegacyLogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages]
+"""Legacy log response, containing source information and log messages."""
+LogResponse: TypeAlias = tuple[LogSourceInfo, list[RawLogStream]]
+LogResponseWithSize: TypeAlias = tuple[LogSourceInfo, list[RawLogStream], int]
+"""Log response, containing source information, stream of log lines, and total 
log size."""
+StructuredLogStream: TypeAlias = Generator["StructuredLogMessage", None, None]
+"""Structured log stream, containing structured log messages."""
+LogHandlerOutputStream: TypeAlias = Union[
+    StructuredLogStream, Iterator["StructuredLogMessage"], 
chain["StructuredLogMessage"]
+]
+"""Output stream, containing structured log messages or a chain of them."""
+ParsedLog: TypeAlias = tuple[Optional[datetime], int, "StructuredLogMessage"]
+"""Parsed log record, containing timestamp, line_num and the structured log 
message."""
+ParsedLogStream: TypeAlias = Generator[ParsedLog, None, None]
+LegacyProvidersLogType: TypeAlias = Union[list["StructuredLogMessage"], str, 
list[str]]
+"""Return type used by legacy `_read` methods for Alibaba Cloud, 
Elasticsearch, OpenSearch, and Redis log handlers.
+
+- For Elasticsearch and OpenSearch: returns either a list of structured log 
messages.
+- For Alibaba Cloud: returns a string.
+- For Redis: returns a list of strings.
+"""
+
 
 logger = logging.getLogger(__name__)
 
 
+class LogMetadata(TypedDict):
+    """Metadata about the log fetching process, including `end_of_log` and 
`log_pos`."""
+
+    end_of_log: bool
+    log_pos: NotRequired[int]
+    # the following attributes are used for Elasticsearch and OpenSearch log 
handlers
+    offset: NotRequired[str | int]

Review Comment:
   This is a special one related to `OpensearchTaskHandler` and 
`ElasticsearchTaskHandler`, which is not related to backward compatibiity, it's 
because `Large offset numbers will get JSON.parsed incorrectly on the client. 
Sending as a string prevents this issue.`
   
   I had copy the comment back as well.



##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -19,51 +19,111 @@
 
 from __future__ import annotations
 
-import itertools
+import heapq
+import io
 import logging
 import os
-from collections.abc import Iterable
+from collections.abc import Generator, Iterator
 from contextlib import suppress
 from datetime import datetime
 from enum import Enum
+from itertools import chain
 from pathlib import Path
-from typing import TYPE_CHECKING, Any, Callable, Union
+from types import GeneratorType
+from typing import IO, TYPE_CHECKING, Callable, Optional, TypedDict, Union, 
cast
 from urllib.parse import urljoin
 
 import pendulum
 from pydantic import BaseModel, ConfigDict, ValidationError
+from typing_extensions import NotRequired
 
 from airflow.configuration import conf
 from airflow.executors.executor_loader import ExecutorLoader
 from airflow.utils.helpers import parse_template_string, render_template
+from airflow.utils.log.log_stream_accumulator import LogStreamAccumulator
 from airflow.utils.log.logging_mixin import SetContextPropagate
 from airflow.utils.log.non_caching_file_handler import 
NonCachingRotatingFileHandler
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.state import State, TaskInstanceState
 
 if TYPE_CHECKING:
+    from requests import Response
+
     from airflow.executors.base_executor import BaseExecutor
     from airflow.models.taskinstance import TaskInstance
     from airflow.models.taskinstancehistory import TaskInstanceHistory
     from airflow.typing_compat import TypeAlias
 
+CHUNK_SIZE = 1024 * 1024 * 5  # 5MB
+DEFAULT_SORT_DATETIME = pendulum.datetime(2000, 1, 1)
+DEFAULT_SORT_TIMESTAMP = int(DEFAULT_SORT_DATETIME.timestamp() * 1000)
+SORT_KEY_OFFSET = 10000000
+"""An offset used by the _create_sort_key utility.
+
+Assuming 50 characters per line, an offset of 10,000,000 can represent 
approximately 500 MB of file data, which is sufficient for use as a constant.
+"""
+HEAP_DUMP_SIZE = 5000
+HALF_HEAP_DUMP_SIZE = HEAP_DUMP_SIZE // 2
 
 # These types are similar, but have distinct names to make processing them 
less error prone
-LogMessages: TypeAlias = Union[list["StructuredLogMessage"], list[str]]
-"""The log messages themselves, either in already sturcutured form, or a 
single string blob to be parsed later"""
+LogMessages: TypeAlias = list[str]
+"""The legacy format of log messages, represented as a single string blob to 
be parsed later."""
 LogSourceInfo: TypeAlias = list[str]
 """Information _about_ the log fetching process for display to a user"""
-LogMetadata: TypeAlias = dict[str, Any]
+RawLogStream: TypeAlias = Generator[str, None, None]
+"""Raw log stream, containing unparsed log lines."""
+LegacyLogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages]
+"""Legacy log response, containing source information and log messages."""
+LogResponse: TypeAlias = tuple[LogSourceInfo, list[RawLogStream]]
+LogResponseWithSize: TypeAlias = tuple[LogSourceInfo, list[RawLogStream], int]
+"""Log response, containing source information, stream of log lines, and total 
log size."""
+StructuredLogStream: TypeAlias = Generator["StructuredLogMessage", None, None]
+"""Structured log stream, containing structured log messages."""
+LogHandlerOutputStream: TypeAlias = Union[
+    StructuredLogStream, Iterator["StructuredLogMessage"], 
chain["StructuredLogMessage"]
+]
+"""Output stream, containing structured log messages or a chain of them."""
+ParsedLog: TypeAlias = tuple[Optional[datetime], int, "StructuredLogMessage"]
+"""Parsed log record, containing timestamp, line_num and the structured log 
message."""
+ParsedLogStream: TypeAlias = Generator[ParsedLog, None, None]
+LegacyProvidersLogType: TypeAlias = Union[list["StructuredLogMessage"], str, 
list[str]]
+"""Return type used by legacy `_read` methods for Alibaba Cloud, 
Elasticsearch, OpenSearch, and Redis log handlers.
+
+- For Elasticsearch and OpenSearch: returns either a list of structured log 
messages.
+- For Alibaba Cloud: returns a string.
+- For Redis: returns a list of strings.
+"""
+
 
 logger = logging.getLogger(__name__)
 
 
+class LogMetadata(TypedDict):
+    """Metadata about the log fetching process, including `end_of_log` and 
`log_pos`."""
+
+    end_of_log: bool
+    log_pos: NotRequired[int]
+    # the following attributes are used for Elasticsearch and OpenSearch log 
handlers
+    offset: NotRequired[str | int]

Review Comment:
   This is a special one related to `OpensearchTaskHandler` and 
`ElasticsearchTaskHandler`, which is not related to backward compatibiity, it's 
because `Large offset numbers will get JSON.parsed incorrectly on the client. 
Sending as a string prevents this issue.`
   
   I had copied the comment back as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to