jason810496 commented on code in PR #49470:
URL: https://github.com/apache/airflow/pull/49470#discussion_r2160346872
##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -19,51 +19,111 @@
from __future__ import annotations
-import itertools
+import heapq
+import io
import logging
import os
-from collections.abc import Iterable
+from collections.abc import Generator, Iterator
from contextlib import suppress
from datetime import datetime
from enum import Enum
+from itertools import chain
from pathlib import Path
-from typing import TYPE_CHECKING, Any, Callable, Union
+from types import GeneratorType
+from typing import IO, TYPE_CHECKING, Callable, Optional, TypedDict, Union,
cast
from urllib.parse import urljoin
import pendulum
from pydantic import BaseModel, ConfigDict, ValidationError
+from typing_extensions import NotRequired
from airflow.configuration import conf
from airflow.executors.executor_loader import ExecutorLoader
from airflow.utils.helpers import parse_template_string, render_template
+from airflow.utils.log.log_stream_accumulator import LogStreamAccumulator
from airflow.utils.log.logging_mixin import SetContextPropagate
from airflow.utils.log.non_caching_file_handler import
NonCachingRotatingFileHandler
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import State, TaskInstanceState
if TYPE_CHECKING:
+ from requests import Response
+
from airflow.executors.base_executor import BaseExecutor
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancehistory import TaskInstanceHistory
from airflow.typing_compat import TypeAlias
+CHUNK_SIZE = 1024 * 1024 * 5 # 5MB
+DEFAULT_SORT_DATETIME = pendulum.datetime(2000, 1, 1)
+DEFAULT_SORT_TIMESTAMP = int(DEFAULT_SORT_DATETIME.timestamp() * 1000)
+SORT_KEY_OFFSET = 10000000
+"""An offset used by the _create_sort_key utility.
+
+Assuming 50 characters per line, an offset of 10,000,000 can represent
approximately 500 MB of file data, which is sufficient for use as a constant.
+"""
+HEAP_DUMP_SIZE = 5000
+HALF_HEAP_DUMP_SIZE = HEAP_DUMP_SIZE // 2
# These types are similar, but have distinct names to make processing them
less error prone
-LogMessages: TypeAlias = Union[list["StructuredLogMessage"], list[str]]
-"""The log messages themselves, either in already sturcutured form, or a
single string blob to be parsed later"""
+LogMessages: TypeAlias = list[str]
+"""The legacy format of log messages, represented as a single string blob to
be parsed later."""
LogSourceInfo: TypeAlias = list[str]
"""Information _about_ the log fetching process for display to a user"""
-LogMetadata: TypeAlias = dict[str, Any]
+RawLogStream: TypeAlias = Generator[str, None, None]
+"""Raw log stream, containing unparsed log lines."""
+LegacyLogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages]
+"""Legacy log response, containing source information and log messages."""
+LogResponse: TypeAlias = tuple[LogSourceInfo, list[RawLogStream]]
+LogResponseWithSize: TypeAlias = tuple[LogSourceInfo, list[RawLogStream], int]
+"""Log response, containing source information, stream of log lines, and total
log size."""
+StructuredLogStream: TypeAlias = Generator["StructuredLogMessage", None, None]
+"""Structured log stream, containing structured log messages."""
+LogHandlerOutputStream: TypeAlias = Union[
+ StructuredLogStream, Iterator["StructuredLogMessage"],
chain["StructuredLogMessage"]
+]
+"""Output stream, containing structured log messages or a chain of them."""
+ParsedLog: TypeAlias = tuple[Optional[datetime], int, "StructuredLogMessage"]
+"""Parsed log record, containing timestamp, line_num and the structured log
message."""
+ParsedLogStream: TypeAlias = Generator[ParsedLog, None, None]
+LegacyProvidersLogType: TypeAlias = Union[list["StructuredLogMessage"], str,
list[str]]
+"""Return type used by legacy `_read` methods for Alibaba Cloud,
Elasticsearch, OpenSearch, and Redis log handlers.
+
+- For Elasticsearch and OpenSearch: returns either a list of structured log
messages.
+- For Alibaba Cloud: returns a string.
+- For Redis: returns a list of strings.
+"""
+
logger = logging.getLogger(__name__)
+class LogMetadata(TypedDict):
+ """Metadata about the log fetching process, including `end_of_log` and
`log_pos`."""
+
+ end_of_log: bool
+ log_pos: NotRequired[int]
+ # the following attributes are used for Elasticsearch and OpenSearch log
handlers
+ offset: NotRequired[str | int]
Review Comment:
This is a special one related to `OpensearchTaskHandler` and
`ElasticsearchTaskHandler`, which is not related to backward compatibiity, it's
because `Large offset numbers will get JSON.parsed incorrectly on the client.
Sending as a string prevents this issue.`
I had copy the comment back as well.
##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -19,51 +19,111 @@
from __future__ import annotations
-import itertools
+import heapq
+import io
import logging
import os
-from collections.abc import Iterable
+from collections.abc import Generator, Iterator
from contextlib import suppress
from datetime import datetime
from enum import Enum
+from itertools import chain
from pathlib import Path
-from typing import TYPE_CHECKING, Any, Callable, Union
+from types import GeneratorType
+from typing import IO, TYPE_CHECKING, Callable, Optional, TypedDict, Union,
cast
from urllib.parse import urljoin
import pendulum
from pydantic import BaseModel, ConfigDict, ValidationError
+from typing_extensions import NotRequired
from airflow.configuration import conf
from airflow.executors.executor_loader import ExecutorLoader
from airflow.utils.helpers import parse_template_string, render_template
+from airflow.utils.log.log_stream_accumulator import LogStreamAccumulator
from airflow.utils.log.logging_mixin import SetContextPropagate
from airflow.utils.log.non_caching_file_handler import
NonCachingRotatingFileHandler
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import State, TaskInstanceState
if TYPE_CHECKING:
+ from requests import Response
+
from airflow.executors.base_executor import BaseExecutor
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancehistory import TaskInstanceHistory
from airflow.typing_compat import TypeAlias
+CHUNK_SIZE = 1024 * 1024 * 5 # 5MB
+DEFAULT_SORT_DATETIME = pendulum.datetime(2000, 1, 1)
+DEFAULT_SORT_TIMESTAMP = int(DEFAULT_SORT_DATETIME.timestamp() * 1000)
+SORT_KEY_OFFSET = 10000000
+"""An offset used by the _create_sort_key utility.
+
+Assuming 50 characters per line, an offset of 10,000,000 can represent
approximately 500 MB of file data, which is sufficient for use as a constant.
+"""
+HEAP_DUMP_SIZE = 5000
+HALF_HEAP_DUMP_SIZE = HEAP_DUMP_SIZE // 2
# These types are similar, but have distinct names to make processing them
less error prone
-LogMessages: TypeAlias = Union[list["StructuredLogMessage"], list[str]]
-"""The log messages themselves, either in already sturcutured form, or a
single string blob to be parsed later"""
+LogMessages: TypeAlias = list[str]
+"""The legacy format of log messages, represented as a single string blob to
be parsed later."""
LogSourceInfo: TypeAlias = list[str]
"""Information _about_ the log fetching process for display to a user"""
-LogMetadata: TypeAlias = dict[str, Any]
+RawLogStream: TypeAlias = Generator[str, None, None]
+"""Raw log stream, containing unparsed log lines."""
+LegacyLogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages]
+"""Legacy log response, containing source information and log messages."""
+LogResponse: TypeAlias = tuple[LogSourceInfo, list[RawLogStream]]
+LogResponseWithSize: TypeAlias = tuple[LogSourceInfo, list[RawLogStream], int]
+"""Log response, containing source information, stream of log lines, and total
log size."""
+StructuredLogStream: TypeAlias = Generator["StructuredLogMessage", None, None]
+"""Structured log stream, containing structured log messages."""
+LogHandlerOutputStream: TypeAlias = Union[
+ StructuredLogStream, Iterator["StructuredLogMessage"],
chain["StructuredLogMessage"]
+]
+"""Output stream, containing structured log messages or a chain of them."""
+ParsedLog: TypeAlias = tuple[Optional[datetime], int, "StructuredLogMessage"]
+"""Parsed log record, containing timestamp, line_num and the structured log
message."""
+ParsedLogStream: TypeAlias = Generator[ParsedLog, None, None]
+LegacyProvidersLogType: TypeAlias = Union[list["StructuredLogMessage"], str,
list[str]]
+"""Return type used by legacy `_read` methods for Alibaba Cloud,
Elasticsearch, OpenSearch, and Redis log handlers.
+
+- For Elasticsearch and OpenSearch: returns either a list of structured log
messages.
+- For Alibaba Cloud: returns a string.
+- For Redis: returns a list of strings.
+"""
+
logger = logging.getLogger(__name__)
+class LogMetadata(TypedDict):
+ """Metadata about the log fetching process, including `end_of_log` and
`log_pos`."""
+
+ end_of_log: bool
+ log_pos: NotRequired[int]
+ # the following attributes are used for Elasticsearch and OpenSearch log
handlers
+ offset: NotRequired[str | int]
Review Comment:
This is a special one related to `OpensearchTaskHandler` and
`ElasticsearchTaskHandler`, which is not related to backward compatibiity, it's
because `Large offset numbers will get JSON.parsed incorrectly on the client.
Sending as a string prevents this issue.`
I had copied the comment back as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]