Lee-W commented on code in PR #49470:
URL: https://github.com/apache/airflow/pull/49470#discussion_r2159887508
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py:
##########
@@ -138,24 +144,24 @@ def get_log(
with contextlib.suppress(TaskNotFound):
ti.task = dag.get_task(ti.task_id)
- if accept == Mimetype.JSON or accept == Mimetype.ANY: # default
- logs, metadata = task_log_reader.read_log_chunks(ti, try_number,
metadata)
- encoded_token = None
+ if accept == Mimetype.NDJSON: # only specified application/x-ndjson will
return streaming response
+ log_stream = task_log_reader.read_log_stream(ti, try_number, metadata)
# type: ignore[arg-type]
+ headers = None
if not metadata.get("end_of_log", False):
- encoded_token =
URLSafeSerializer(request.app.state.secret_key).dumps(metadata)
- return
TaskInstancesLogResponse.model_construct(continuation_token=encoded_token,
content=logs)
- # text/plain, or something else we don't understand. Return raw log content
-
- # We need to exhaust the iterator before we can generate the continuation
token.
- # We could improve this by making it a streaming/async response, and by
then setting the header using
- # HTTP Trailers
- logs = "".join(task_log_reader.read_log_stream(ti, try_number, metadata))
- headers = None
- if not metadata.get("end_of_log", False):
- headers = {
- "Airflow-Continuation-Token":
URLSafeSerializer(request.app.state.secret_key).dumps(metadata)
- }
- return Response(media_type="application/x-ndjson", content=logs,
headers=headers)
+ headers = {
+ "Airflow-Continuation-Token":
URLSafeSerializer(request.app.state.secret_key).dumps(metadata)
+ }
+ return StreamingResponse(media_type="application/x-ndjson",
content=log_stream, headers=headers)
+
+ # application/json, or something else we don't understand.
+ # Return JSON format, which will be more easily for users to debug.
+ structured_log_stream, out_metadata = task_log_reader.read_log_chunks(ti,
try_number, metadata) # type: ignore[arg-type]
+ encoded_token = None
Review Comment:
I feel like we can extract the token logic here as a function, but that can
be done in a following up PR
##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -19,51 +19,111 @@
from __future__ import annotations
-import itertools
+import heapq
+import io
import logging
import os
-from collections.abc import Iterable
+from collections.abc import Generator, Iterator
from contextlib import suppress
from datetime import datetime
from enum import Enum
+from itertools import chain
from pathlib import Path
-from typing import TYPE_CHECKING, Any, Callable, Union
+from types import GeneratorType
+from typing import IO, TYPE_CHECKING, Callable, Optional, TypedDict, Union,
cast
from urllib.parse import urljoin
import pendulum
from pydantic import BaseModel, ConfigDict, ValidationError
+from typing_extensions import NotRequired
from airflow.configuration import conf
from airflow.executors.executor_loader import ExecutorLoader
from airflow.utils.helpers import parse_template_string, render_template
+from airflow.utils.log.log_stream_accumulator import LogStreamAccumulator
from airflow.utils.log.logging_mixin import SetContextPropagate
from airflow.utils.log.non_caching_file_handler import
NonCachingRotatingFileHandler
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import State, TaskInstanceState
if TYPE_CHECKING:
+ from requests import Response
+
from airflow.executors.base_executor import BaseExecutor
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancehistory import TaskInstanceHistory
from airflow.typing_compat import TypeAlias
+CHUNK_SIZE = 1024 * 1024 * 5 # 5MB
+DEFAULT_SORT_DATETIME = pendulum.datetime(2000, 1, 1)
+DEFAULT_SORT_TIMESTAMP = int(DEFAULT_SORT_DATETIME.timestamp() * 1000)
+SORT_KEY_OFFSET = 10000000
+"""An offset used by the _create_sort_key utility.
+
+Assuming 50 characters per line, an offset of 10,000,000 can represent
approximately 500 MB of file data, which is sufficient for use as a constant.
+"""
+HEAP_DUMP_SIZE = 5000
+HALF_HEAP_DUMP_SIZE = HEAP_DUMP_SIZE // 2
# These types are similar, but have distinct names to make processing them
less error prone
-LogMessages: TypeAlias = Union[list["StructuredLogMessage"], list[str]]
-"""The log messages themselves, either in already sturcutured form, or a
single string blob to be parsed later"""
+LogMessages: TypeAlias = list[str]
+"""The legacy format of log messages, represented as a single string blob to
be parsed later."""
LogSourceInfo: TypeAlias = list[str]
"""Information _about_ the log fetching process for display to a user"""
-LogMetadata: TypeAlias = dict[str, Any]
+RawLogStream: TypeAlias = Generator[str, None, None]
+"""Raw log stream, containing unparsed log lines."""
+LegacyLogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages]
+"""Legacy log response, containing source information and log messages."""
+LogResponse: TypeAlias = tuple[LogSourceInfo, list[RawLogStream]]
+LogResponseWithSize: TypeAlias = tuple[LogSourceInfo, list[RawLogStream], int]
+"""Log response, containing source information, stream of log lines, and total
log size."""
+StructuredLogStream: TypeAlias = Generator["StructuredLogMessage", None, None]
+"""Structured log stream, containing structured log messages."""
+LogHandlerOutputStream: TypeAlias = Union[
+ StructuredLogStream, Iterator["StructuredLogMessage"],
chain["StructuredLogMessage"]
+]
+"""Output stream, containing structured log messages or a chain of them."""
+ParsedLog: TypeAlias = tuple[Optional[datetime], int, "StructuredLogMessage"]
+"""Parsed log record, containing timestamp, line_num and the structured log
message."""
+ParsedLogStream: TypeAlias = Generator[ParsedLog, None, None]
+LegacyProvidersLogType: TypeAlias = Union[list["StructuredLogMessage"], str,
list[str]]
+"""Return type used by legacy `_read` methods for Alibaba Cloud,
Elasticsearch, OpenSearch, and Redis log handlers.
+
+- For Elasticsearch and OpenSearch: returns either a list of structured log
messages.
+- For Alibaba Cloud: returns a string.
+- For Redis: returns a list of strings.
+"""
+
logger = logging.getLogger(__name__)
+class LogMetadata(TypedDict):
+ """Metadata about the log fetching process, including `end_of_log` and
`log_pos`."""
+
+ end_of_log: bool
+ log_pos: NotRequired[int]
+ # the following attributes are used for Elasticsearch and OpenSearch log
handlers
+ offset: NotRequired[str | int]
Review Comment:
It's interesting that it could be a `str` in some cases 🤔 but I guess it's
more for backward compatibiity
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py:
##########
@@ -138,24 +144,24 @@ def get_log(
with contextlib.suppress(TaskNotFound):
ti.task = dag.get_task(ti.task_id)
- if accept == Mimetype.JSON or accept == Mimetype.ANY: # default
- logs, metadata = task_log_reader.read_log_chunks(ti, try_number,
metadata)
- encoded_token = None
+ if accept == Mimetype.NDJSON: # only specified application/x-ndjson will
return streaming response
+ log_stream = task_log_reader.read_log_stream(ti, try_number, metadata)
# type: ignore[arg-type]
Review Comment:
Let's add a comment on why `type: ignore[arg-type]` is needed
##########
airflow-core/src/airflow/utils/log/log_reader.py:
##########
@@ -91,24 +92,19 @@ def read_log_stream(
try_number = ti.try_number
for key in ("end_of_log", "max_offset", "offset", "log_pos"):
- metadata.pop(key, None)
+ #
https://mypy.readthedocs.io/en/stable/typed_dict.html#supported-operations
+ metadata.pop(key, None) # type: ignore[misc]
empty_iterations = 0
while True:
- logs, out_metadata = self.read_log_chunks(ti, try_number, metadata)
- # Update the metadata dict in place so caller can get new
values/end-of-log etc.
-
- for log in logs:
- # It's a bit wasteful here to parse the JSON then dump it back
again.
- # Optimize this so in stream mode we can just pass logs right
through, or even better add
- # support to 307 redirect to a signed URL etc.
Review Comment:
Is this comment still needed?
##########
devel-common/src/tests_common/test_utils/file_task_handler.py:
##########
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import itertools
+from collections.abc import Generator, Iterable
+from datetime import datetime
+from typing import TYPE_CHECKING
+
+import pendulum
+
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
+if TYPE_CHECKING:
+ from airflow.utils.log.file_task_handler import ParsedLog,
StructuredLogMessage
+
+
+def events(logs: Iterable[StructuredLogMessage], skip_source_info=True) ->
list[str]:
+ """Helper function to return just the event (a.k.a message) from a list of
StructuredLogMessage"""
+ logs = iter(logs)
+ if skip_source_info:
+
+ def is_source_group(log: StructuredLogMessage):
+ return not hasattr(log, "timestamp") or log.event ==
"::endgroup::" or hasattr(log, "sources")
+
+ logs = itertools.dropwhile(is_source_group, logs)
+
+ return [s.event for s in logs]
+
+
+def convert_list_to_stream(input_list: list[str]) -> Generator[str, None,
None]:
+ """
+ Convert a list of strings to a stream-like object.
+ This function yields each string in the list one by one.
+ """
+ yield from input_list
+
+
+def mock_parsed_logs_factory(
+ event_prefix: str,
+ start_datetime: datetime,
+ count: int,
+) -> list[ParsedLog]:
+ """
+ Create a list of ParsedLog objects with the specified start datetime and
count.
+ Each ParsedLog object contains a timestamp and a list of
StructuredLogMessage objects.
+ """
+ if AIRFLOW_V_3_0_PLUS:
+ from airflow.utils.log.file_task_handler import StructuredLogMessage
+
+ parsed_logs: list[ParsedLog] = []
+ for i in range(count):
+ timestamp: datetime = start_datetime + pendulum.duration(seconds=i)
+ structured_log = StructuredLogMessage(timestamp=None,
event=f"{event_prefix} Event {i}")
+ parsed_logs.append(
+ (
+ timestamp,
+ i,
+ structured_log,
+ )
+ )
Review Comment:
```suggestion
parsed_logs: list[ParsedLog] = [
(
start_datetime + pendulum.duration(seconds=i),
i,
StructuredLogMessage(timestamp=None, event=f"{event_prefix}
Event {i}")
)
for i in range(count):
]
```
I also notice the timestamp is set to `None` here. Is it expected
##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -166,17 +248,147 @@ def _parse_log_lines(
log.timestamp = coerce_datetime(log.timestamp)
timestamp = log.timestamp
yield timestamp, idx, log
+ idx += 1
+
+def _create_sort_key(timestamp: datetime | None, line_num: int) -> int:
+ """
+ Create a sort key for log record, to be used in K-way merge.
+
+ :param timestamp: timestamp of the log line
+ :param line_num: line number of the log line
+ :return: a integer as sort key to avoid overhead of memory usage
+ """
+ return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) *
SORT_KEY_OFFSET + line_num
-def _interleave_logs(*logs: str | LogMessages) ->
Iterable[StructuredLogMessage]:
- min_date = pendulum.datetime(2000, 1, 1)
- records = itertools.chain.from_iterable(_parse_log_lines(log) for log in
logs)
- last = None
- for timestamp, _, msg in sorted(records, key=lambda x: (x[0] or min_date,
x[1])):
- if msg != last or not timestamp: # dedupe
- yield msg
- last = msg
+def _is_sort_key_with_default_timestamp(sort_key: int) -> bool:
+ """
+ Check if the sort key was generated with the DEFAULT_SORT_TIMESTAMP.
+
+ This is used to identify log records that don't have timestamp.
+
+ :param sort_key: The sort key to check
+ :return: True if the sort key was generated with DEFAULT_SORT_TIMESTAMP,
False otherwise
+ """
+ # Extract the timestamp part from the sort key (remove the line number
part)
+ timestamp_part = sort_key // SORT_KEY_OFFSET
+ return timestamp_part == DEFAULT_SORT_TIMESTAMP
+
+
+def _add_log_from_parsed_log_streams_to_heap(
+ heap: list[tuple[int, StructuredLogMessage]],
+ parsed_log_streams: dict[int, ParsedLogStream],
+) -> None:
+ """
+ Add one log record from each parsed log stream to the heap, and will
remove empty log stream from the dict after iterating.
+
+ :param heap: heap to store log records
+ :param parsed_log_streams: dict of parsed log streams
+ """
+ log_stream_to_remove: list[int] | None = None
+ for idx, log_stream in parsed_log_streams.items():
+ record: ParsedLog | None = next(log_stream, None)
+ if record is None:
+ if log_stream_to_remove is None:
+ log_stream_to_remove = []
+ log_stream_to_remove.append(idx)
+ continue
+ # add type hint to avoid mypy error
+ record = cast("ParsedLog", record)
+ timestamp, line_num, line = record
+ # take int as sort key to avoid overhead of memory usage
+ heapq.heappush(heap, (_create_sort_key(timestamp, line_num), line))
+ # remove empty log stream from the dict
+ if log_stream_to_remove is not None:
+ for idx in log_stream_to_remove:
+ del parsed_log_streams[idx]
Review Comment:
```suggestion
log_stream_to_remove: list[int] []
for idx, log_stream in parsed_log_streams.items():
record: ParsedLog | None = next(log_stream, None)
if record is None:
log_stream_to_remove.append(idx)
continue
# add type hint to avoid mypy error
record = cast("ParsedLog", record)
timestamp, line_num, line = record
# take int as sort key to avoid overhead of memory usage
heapq.heappush(heap, (_create_sort_key(timestamp, line_num), line))
# remove empty log stream from the dict
for idx in log_stream_to_remove:
del parsed_log_streams[idx]
```
not sure whether I understand it correctly, but I'm not sure whether we need
the additional logic here
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py:
##########
@@ -138,24 +144,24 @@ def get_log(
with contextlib.suppress(TaskNotFound):
ti.task = dag.get_task(ti.task_id)
- if accept == Mimetype.JSON or accept == Mimetype.ANY: # default
- logs, metadata = task_log_reader.read_log_chunks(ti, try_number,
metadata)
- encoded_token = None
+ if accept == Mimetype.NDJSON: # only specified application/x-ndjson will
return streaming response
+ log_stream = task_log_reader.read_log_stream(ti, try_number, metadata)
# type: ignore[arg-type]
+ headers = None
if not metadata.get("end_of_log", False):
- encoded_token =
URLSafeSerializer(request.app.state.secret_key).dumps(metadata)
- return
TaskInstancesLogResponse.model_construct(continuation_token=encoded_token,
content=logs)
- # text/plain, or something else we don't understand. Return raw log content
-
- # We need to exhaust the iterator before we can generate the continuation
token.
- # We could improve this by making it a streaming/async response, and by
then setting the header using
- # HTTP Trailers
- logs = "".join(task_log_reader.read_log_stream(ti, try_number, metadata))
- headers = None
- if not metadata.get("end_of_log", False):
- headers = {
- "Airflow-Continuation-Token":
URLSafeSerializer(request.app.state.secret_key).dumps(metadata)
- }
- return Response(media_type="application/x-ndjson", content=logs,
headers=headers)
+ headers = {
+ "Airflow-Continuation-Token":
URLSafeSerializer(request.app.state.secret_key).dumps(metadata)
+ }
+ return StreamingResponse(media_type="application/x-ndjson",
content=log_stream, headers=headers)
+
+ # application/json, or something else we don't understand.
+ # Return JSON format, which will be more easily for users to debug.
+ structured_log_stream, out_metadata = task_log_reader.read_log_chunks(ti,
try_number, metadata) # type: ignore[arg-type]
Review Comment:
same here
##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -19,51 +19,111 @@
from __future__ import annotations
-import itertools
+import heapq
+import io
import logging
import os
-from collections.abc import Iterable
+from collections.abc import Generator, Iterator
from contextlib import suppress
from datetime import datetime
from enum import Enum
+from itertools import chain
from pathlib import Path
-from typing import TYPE_CHECKING, Any, Callable, Union
+from types import GeneratorType
+from typing import IO, TYPE_CHECKING, Callable, Optional, TypedDict, Union,
cast
from urllib.parse import urljoin
import pendulum
from pydantic import BaseModel, ConfigDict, ValidationError
+from typing_extensions import NotRequired
from airflow.configuration import conf
from airflow.executors.executor_loader import ExecutorLoader
from airflow.utils.helpers import parse_template_string, render_template
+from airflow.utils.log.log_stream_accumulator import LogStreamAccumulator
from airflow.utils.log.logging_mixin import SetContextPropagate
from airflow.utils.log.non_caching_file_handler import
NonCachingRotatingFileHandler
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import State, TaskInstanceState
if TYPE_CHECKING:
+ from requests import Response
+
from airflow.executors.base_executor import BaseExecutor
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancehistory import TaskInstanceHistory
from airflow.typing_compat import TypeAlias
+CHUNK_SIZE = 1024 * 1024 * 5 # 5MB
+DEFAULT_SORT_DATETIME = pendulum.datetime(2000, 1, 1)
+DEFAULT_SORT_TIMESTAMP = int(DEFAULT_SORT_DATETIME.timestamp() * 1000)
+SORT_KEY_OFFSET = 10000000
+"""An offset used by the _create_sort_key utility.
+
+Assuming 50 characters per line, an offset of 10,000,000 can represent
approximately 500 MB of file data, which is sufficient for use as a constant.
+"""
+HEAP_DUMP_SIZE = 5000
+HALF_HEAP_DUMP_SIZE = HEAP_DUMP_SIZE // 2
# These types are similar, but have distinct names to make processing them
less error prone
-LogMessages: TypeAlias = Union[list["StructuredLogMessage"], list[str]]
-"""The log messages themselves, either in already sturcutured form, or a
single string blob to be parsed later"""
+LogMessages: TypeAlias = list[str]
+"""The legacy format of log messages, represented as a single string blob to
be parsed later."""
Review Comment:
Let's improve the comment by explaining what this `legacy` means. If I'm not
misunderstood, it's pre-3.0.2
##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -423,12 +649,23 @@ def _read(
TaskInstanceState.RUNNING,
TaskInstanceState.DEFERRED,
)
- if metadata and "log_pos" in metadata:
- previous_line = metadata["log_pos"]
- logs = logs[previous_line:] # Cut off previously passed log test
as new tail
- else:
- logs = header + logs
- return logs, {"end_of_log": end_of_log, "log_pos": log_pos}
+
+ with LogStreamAccumulator(out_stream, HEAP_DUMP_SIZE) as
stream_accumulator:
+ log_pos = stream_accumulator.total_lines
+ out_stream = stream_accumulator.stream
+
+ # skip log stream until the last position
+ if metadata and "log_pos" in metadata:
+ for _ in range(metadata["log_pos"]):
+ next(out_stream, None)
Review Comment:
not sure we can use `islice` here
##########
devel-common/src/tests_common/test_utils/file_task_handler.py:
##########
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import itertools
+from collections.abc import Generator, Iterable
+from datetime import datetime
+from typing import TYPE_CHECKING
+
+import pendulum
+
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
+if TYPE_CHECKING:
+ from airflow.utils.log.file_task_handler import ParsedLog,
StructuredLogMessage
+
+
+def events(logs: Iterable[StructuredLogMessage], skip_source_info=True) ->
list[str]:
Review Comment:
```suggestion
def extract_events(logs: Iterable[StructuredLogMessage],
skip_source_info=True) -> list[str]:
```
##########
providers/redis/tests/unit/redis/log/test_redis_task_handler.py:
##########
@@ -111,7 +112,12 @@ def test_read(self, ti):
logs = handler.read(ti)
if AIRFLOW_V_3_0_PLUS:
- assert logs == (["Line 1\nLine 2"], {"end_of_log": True})
+ if get_base_airflow_version_tuple() <= (3, 0, 2):
Review Comment:
```suggestion
if get_base_airflow_version_tuple() < (3, 0, 3):
```
##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -166,17 +248,147 @@ def _parse_log_lines(
log.timestamp = coerce_datetime(log.timestamp)
timestamp = log.timestamp
yield timestamp, idx, log
+ idx += 1
+
+def _create_sort_key(timestamp: datetime | None, line_num: int) -> int:
+ """
+ Create a sort key for log record, to be used in K-way merge.
+
+ :param timestamp: timestamp of the log line
+ :param line_num: line number of the log line
+ :return: a integer as sort key to avoid overhead of memory usage
+ """
+ return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) *
SORT_KEY_OFFSET + line_num
-def _interleave_logs(*logs: str | LogMessages) ->
Iterable[StructuredLogMessage]:
- min_date = pendulum.datetime(2000, 1, 1)
- records = itertools.chain.from_iterable(_parse_log_lines(log) for log in
logs)
- last = None
- for timestamp, _, msg in sorted(records, key=lambda x: (x[0] or min_date,
x[1])):
- if msg != last or not timestamp: # dedupe
- yield msg
- last = msg
+def _is_sort_key_with_default_timestamp(sort_key: int) -> bool:
+ """
+ Check if the sort key was generated with the DEFAULT_SORT_TIMESTAMP.
+
+ This is used to identify log records that don't have timestamp.
+
+ :param sort_key: The sort key to check
+ :return: True if the sort key was generated with DEFAULT_SORT_TIMESTAMP,
False otherwise
+ """
+ # Extract the timestamp part from the sort key (remove the line number
part)
+ timestamp_part = sort_key // SORT_KEY_OFFSET
+ return timestamp_part == DEFAULT_SORT_TIMESTAMP
+
+
+def _add_log_from_parsed_log_streams_to_heap(
+ heap: list[tuple[int, StructuredLogMessage]],
+ parsed_log_streams: dict[int, ParsedLogStream],
+) -> None:
+ """
+ Add one log record from each parsed log stream to the heap, and will
remove empty log stream from the dict after iterating.
+
+ :param heap: heap to store log records
+ :param parsed_log_streams: dict of parsed log streams
+ """
+ log_stream_to_remove: list[int] | None = None
+ for idx, log_stream in parsed_log_streams.items():
+ record: ParsedLog | None = next(log_stream, None)
+ if record is None:
+ if log_stream_to_remove is None:
+ log_stream_to_remove = []
+ log_stream_to_remove.append(idx)
+ continue
+ # add type hint to avoid mypy error
+ record = cast("ParsedLog", record)
+ timestamp, line_num, line = record
+ # take int as sort key to avoid overhead of memory usage
+ heapq.heappush(heap, (_create_sort_key(timestamp, line_num), line))
+ # remove empty log stream from the dict
+ if log_stream_to_remove is not None:
+ for idx in log_stream_to_remove:
+ del parsed_log_streams[idx]
+
+
+def _flush_logs_out_of_heap(
+ heap: list[tuple[int, StructuredLogMessage]],
+ flush_size: int,
+ last_log_container: list[StructuredLogMessage | None],
+) -> Generator[StructuredLogMessage, None, None]:
+ """
+ Flush logs out of the heap, deduplicating them based on the last log.
+
+ :param heap: heap to flush logs from
+ :param flush_size: number of logs to flush
+ :param last_log_container: a container to store the last log, to avoid
duplicate logs
+ :return: a generator that yields deduplicated logs
+ """
+ last_log = last_log_container[0]
+ for _ in range(flush_size):
+ sort_key, line = heapq.heappop(heap)
+ if line != last_log or _is_sort_key_with_default_timestamp(sort_key):
# dedupe
+ yield line
+ last_log = line
+ # update the last log container with the last log
+ last_log_container[0] = last_log
+
+
+def _interleave_logs(*log_streams: RawLogStream) -> StructuredLogStream:
+ """
+ Merge parsed log streams using K-way merge.
+
+ By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we
can reduce the chance of messing up the global order.
+ Since there are multiple log streams, we can't guarantee that the records
are in global order.
+
+ e.g.
+
+ log_stream1: ----------
+ log_stream2: ----
+ log_stream3: --------
+
+ The first record of log_stream3 is later than the fourth record of
log_stream1 !
+ :param parsed_log_streams: parsed log streams
+ :return: interleaved log stream
+ """
+ # don't need to push whole tuple into heap, which increases too much
overhead
+ # push only sort_key and line into heap
+ heap: list[tuple[int, StructuredLogMessage]] = []
+ # to allow removing empty streams while iterating, also turn the str
stream into parsed log stream
+ parsed_log_streams: dict[int, ParsedLogStream] = {
+ idx: _log_stream_to_parsed_log_stream(log_stream) for idx, log_stream
in enumerate(log_streams)
+ }
+
+ # keep adding records from logs until all logs are empty
+ last_log_container: list[StructuredLogMessage | None] = [None]
+ while parsed_log_streams:
+ _add_log_from_parsed_log_streams_to_heap(heap, parsed_log_streams)
+
+ # yield HALF_HEAP_DUMP_SIZE records when heap size exceeds
HEAP_DUMP_SIZE
+ if len(heap) >= HEAP_DUMP_SIZE:
+ yield from _flush_logs_out_of_heap(heap, HALF_HEAP_DUMP_SIZE,
last_log_container)
+
+ # yield remaining records
+ yield from _flush_logs_out_of_heap(heap, len(heap), last_log_container)
+ # free memory
+ del heap
+ del parsed_log_streams
+
+
+def _is_logs_stream_like(log) -> bool:
+ """Check if the logs are stream-like."""
+ return isinstance(log, (chain, GeneratorType))
+
+
+def _get_compatible_log_stream(
+ log_messages: LogMessages,
+) -> RawLogStream:
+ """
+ Convert legacy log message blobs into a generator that yields log lines.
+
+ :param log_messages: List of legacy log message strings.
+ :return: A generator that yields interleaved log lines.
+ """
+ log_streams: list[RawLogStream] = [
+ _stream_lines_by_chunk(io.StringIO(log_message)) for log_message in
log_messages
+ ]
+
+ for log_stream in log_streams:
+ yield from log_stream
Review Comment:
```suggestion
yield from chain.from_iterable(
_stream_lines_by_chunk(io.StringIO(log_message)) for log_message in
log_messages
)
```
we can make it a standalone variable if we want to keep the type
##########
providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py:
##########
@@ -204,6 +204,7 @@ def test_read(self, ti):
"on 2023-07-09 07:47:32+00:00"
)
if AIRFLOW_V_3_0_PLUS:
+ logs = list(logs)
Review Comment:
why do we need this?
##########
airflow-core/tests/unit/utils/test_log_handlers.py:
##########
@@ -724,34 +775,244 @@ def test_parse_timestamps():
]
+def test__sort_key():
+ # assert _sort_key should return int
+ assert
isinstance(_create_sort_key(pendulum.parse("2022-11-16T00:05:54.278000-08:00"),
10), int)
Review Comment:
shouldn't we just use the expected int to ensure the value is correct?
##########
airflow-core/tests/unit/utils/log/test_stream_accumulator.py:
##########
@@ -0,0 +1,163 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import os
+from typing import TYPE_CHECKING
+from unittest import mock
+
+import pendulum
+import pytest
+
+from airflow.utils.log.file_task_handler import StructuredLogMessage
+from airflow.utils.log.log_stream_accumulator import LogStreamAccumulator
+
+if TYPE_CHECKING:
+ from airflow.utils.log.file_task_handler import LogHandlerOutputStream
+
+LOG_START_DATETIME = pendulum.datetime(2023, 10, 1, 0, 0, 0)
+LOG_COUNT = 20
+
+
+class TestLogStreamAccumulator:
+ """Test cases for the LogStreamAccumulator class."""
+
+ @pytest.fixture
+ def structured_logs(self):
+ """Create a stream of mock structured log messages."""
+
+ def generate_logs():
+ for i in range(LOG_COUNT):
+ yield StructuredLogMessage(
+ event=f"test_event_{i + 1}",
+ timestamp=LOG_START_DATETIME.add(seconds=i),
+ level="INFO",
+ message=f"Test log message {i + 1}",
+ )
Review Comment:
```suggestion
yield from (
StructuredLogMessage(
event=f"test_event_{i + 1}",
timestamp=LOG_START_DATETIME.add(seconds=i),
level="INFO",
message=f"Test log message {i + 1}",
)
for i in range(LOG_COUNT)
)
```
##########
airflow-core/tests/unit/utils/test_log_handlers.py:
##########
@@ -188,14 +189,16 @@ def task_callable(ti):
file_handler.close()
assert hasattr(file_handler, "read")
- log, metadata = file_handler.read(ti, 1)
+ log_handler_output_stream, metadata = file_handler.read(ti, 1)
assert isinstance(metadata, dict)
target_re = re.compile(r"\A\[[^\]]+\] {test_log_handlers.py:\d+} INFO
- test\Z")
# We should expect our log line from the callable above to appear in
# the logs we read back
- assert any(re.search(target_re, e) for e in events(log)), "Logs were "
+ str(log)
+ assert any(re.search(target_re, e) for e in
events(log_handler_output_stream)), "Logs were " + str(
+ log_handler_output_stream
+ )
Review Comment:
```suggestion
assert any(re.search(target_re, e) for e in
events(log_handler_output_stream)), f"Logs were {log_handler_output_stream}"
```
##########
providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py:
##########
@@ -346,7 +347,9 @@ def _read(
:return: a list of tuple with host and log documents, metadata.
"""
if not metadata:
- metadata = {"offset": 0}
+ metadata = {"offset": 0} # type: ignore[assignment]
Review Comment:
Let's add a comment on why this ignoring is needed
##########
airflow-core/src/airflow/utils/log/log_stream_accumulator.py:
##########
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import os
+import tempfile
+from itertools import islice
+from typing import IO, TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from airflow.typing_compat import Self
+ from airflow.utils.log.file_task_handler import (
+ LogHandlerOutputStream,
+ StructuredLogMessage,
+ StructuredLogStream,
+ )
+
+
+class LogStreamAccumulator:
+ """
+ Memory-efficient log stream accumulator that tracks the total number of
lines while preserving the original stream.
+
+ This class captures logs from a stream and stores them in a buffer,
flushing them to disk when the buffer
+ exceeds a specified threshold. This approach optimizes memory usage while
handling large log streams.
+
+ Usage:
+
+ .. code-block:: python
+
+ with LogStreamAccumulator(stream, threshold) as log_accumulator:
+ # Get total number of lines captured
+ total_lines = log_accumulator.get_total_lines()
+
+ # Retrieve the original stream of logs
+ for log in log_accumulator.get_stream():
+ print(log)
+ """
+
+ def __init__(
+ self,
+ stream: LogHandlerOutputStream,
+ threshold: int,
+ ) -> None:
+ """
+ Initialize the LogStreamAccumulator.
+
+ Args:
+ stream: The input log stream to capture and count.
+ threshold: Maximum number of lines to keep in memory before
flushing to disk.
+ """
+ self._stream = stream
+ self._threshold = threshold
+ self._buffer: list[StructuredLogMessage] = []
+ self._disk_lines: int = 0
+ self._tmpfile: IO[str] | None = None
+
+ def _flush_buffer_to_disk(self) -> None:
+ """Flush the buffer contents to a temporary file on disk."""
+ if self._tmpfile is None:
+ self._tmpfile = tempfile.NamedTemporaryFile(delete=False,
mode="w+", encoding="utf-8")
+
+ self._disk_lines += len(self._buffer)
+ self._tmpfile.writelines(f"{log.model_dump_json()}\n" for log in
self._buffer)
+ self._tmpfile.flush()
+ self._buffer.clear()
+
+ def _capture(self) -> None:
+ """Capture logs from the stream into the buffer, flushing to disk when
threshold is reached."""
+ while True:
+ # `islice` will try to get up to `self._threshold` lines from the
stream.
+ self._buffer.extend(islice(self._stream, self._threshold))
+ # If the buffer has reached the threshold, flush it to disk.
+ if len(self._buffer) >= self._threshold:
+ self._flush_buffer_to_disk()
+ else: # If there are no more lines to capture, exit the loop.
+ break
Review Comment:
it seems to be missed here
##########
airflow-core/tests/unit/utils/test_log_handlers.py:
##########
@@ -724,34 +775,244 @@ def test_parse_timestamps():
]
+def test__sort_key():
+ # assert _sort_key should return int
+ assert
isinstance(_create_sort_key(pendulum.parse("2022-11-16T00:05:54.278000-08:00"),
10), int)
+
+
[email protected](
+ "timestamp, line_num, expected",
+ [
+ pytest.param(
+ pendulum.parse("2022-11-16T00:05:54.278000-08:00"),
+ 10,
+ False,
+ id="normal_timestamp_1",
+ ),
+ pytest.param(
+ pendulum.parse("2022-11-16T00:05:54.457000-08:00"),
+ 2025,
+ False,
+ id="normal_timestamp_2",
+ ),
+ pytest.param(
+ DEFAULT_SORT_DATETIME,
+ 200,
+ True,
+ id="default_timestamp",
+ ),
+ ],
+)
+def test__is_sort_key_with_default_timestamp(timestamp, line_num, expected):
+ assert _is_sort_key_with_default_timestamp(_create_sort_key(timestamp,
line_num)) == expected
+
+
[email protected](
+ "log_stream, expected",
+ [
+ pytest.param(
+ convert_list_to_stream(
+ [
+ "2022-11-16T00:05:54.278000-08:00",
+ "2022-11-16T00:05:54.457000-08:00",
+ ]
+ ),
+ True,
+ id="normal_log_stream",
+ ),
+ pytest.param(
+ itertools.chain(
+ [
+ "2022-11-16T00:05:54.278000-08:00",
+ "2022-11-16T00:05:54.457000-08:00",
+ ],
+ convert_list_to_stream(
+ [
+ "2022-11-16T00:05:54.278000-08:00",
+ "2022-11-16T00:05:54.457000-08:00",
+ ]
+ ),
+ ),
+ True,
+ id="chain_log_stream",
+ ),
+ pytest.param(
+ [
+ "2022-11-16T00:05:54.278000-08:00",
+ "2022-11-16T00:05:54.457000-08:00",
+ ],
+ False,
+ id="non_stream_log",
+ ),
+ ],
+)
+def test__is_logs_stream_like(log_stream, expected):
+ assert _is_logs_stream_like(log_stream) == expected
+
+
+def test__add_log_from_parsed_log_streams_to_heap():
Review Comment:
Let's add some comments to clarify what we're checking here. I'm not getting
the length checking part
##########
airflow-core/tests/unit/utils/log/test_stream_accumulator.py:
##########
@@ -0,0 +1,163 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import os
+from typing import TYPE_CHECKING
+from unittest import mock
+
+import pendulum
+import pytest
+
+from airflow.utils.log.file_task_handler import StructuredLogMessage
+from airflow.utils.log.log_stream_accumulator import LogStreamAccumulator
+
+if TYPE_CHECKING:
+ from airflow.utils.log.file_task_handler import LogHandlerOutputStream
+
+LOG_START_DATETIME = pendulum.datetime(2023, 10, 1, 0, 0, 0)
+LOG_COUNT = 20
+
+
+class TestLogStreamAccumulator:
+ """Test cases for the LogStreamAccumulator class."""
+
+ @pytest.fixture
+ def structured_logs(self):
+ """Create a stream of mock structured log messages."""
+
+ def generate_logs():
+ for i in range(LOG_COUNT):
+ yield StructuredLogMessage(
+ event=f"test_event_{i + 1}",
+ timestamp=LOG_START_DATETIME.add(seconds=i),
+ level="INFO",
+ message=f"Test log message {i + 1}",
+ )
Review Comment:
Hmmmm... we're using `LOG_COUNT = 20` above. Should it be an argument
instead?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]