Lee-W commented on code in PR #49470:
URL: https://github.com/apache/airflow/pull/49470#discussion_r2151424340
##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -166,17 +242,133 @@ def _parse_log_lines(
log.timestamp = coerce_datetime(log.timestamp)
timestamp = log.timestamp
yield timestamp, idx, log
+ idx += 1
-def _interleave_logs(*logs: str | LogMessages) ->
Iterable[StructuredLogMessage]:
- min_date = pendulum.datetime(2000, 1, 1)
+def _sort_key(timestamp: datetime | None, line_num: int) -> int:
+ """
+ Generate a sort key for log record, to be used in K-way merge.
- records = itertools.chain.from_iterable(_parse_log_lines(log) for log in
logs)
+ :param timestamp: timestamp of the log line
+ :param line_num: line number of the log line
+ :return: a integer as sort key to avoid overhead of memory usage
+ """
+ return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) *
SORT_KEY_OFFSET + line_num
+
+
+def _is_sort_key_with_default_timestamp(sort_key: int) -> bool:
+ """
+ Check if the sort key was generated with the DEFAULT_SORT_TIMESTAMP.
+
+ This is used to identify log records that don't have timestamp.
+
+ :param sort_key: The sort key to check
+ :return: True if the sort key was generated with DEFAULT_SORT_TIMESTAMP,
False otherwise
+ """
+ # Extract the timestamp part from the sort key (remove the line number
part)
+ timestamp_part = sort_key // SORT_KEY_OFFSET
+ return timestamp_part == DEFAULT_SORT_TIMESTAMP
+
+
+def _add_log_from_parsed_log_streams_to_heap(
+ heap: list[tuple[int, StructuredLogMessage]],
+ parsed_log_streams: dict[int, ParsedLogStream],
+) -> None:
+ """
+ Add one log record from each parsed log stream to the heap, and will
remove empty log stream from the dict after iterating.
+
+ :param heap: heap to store log records
+ :param parsed_log_streams: dict of parsed log streams
+ """
+ log_stream_to_remove: list[int] | None = None
+ for idx, log_stream in parsed_log_streams.items():
+ record: ParsedLog | None = next(log_stream, None)
+ if record is None:
+ if log_stream_to_remove is None:
+ log_stream_to_remove = []
+ log_stream_to_remove.append(idx)
+ continue
+ # add type hint to avoid mypy error
+ record = cast("ParsedLog", record)
+ timestamp, line_num, line = record
+ # take int as sort key to avoid overhead of memory usage
+ heapq.heappush(heap, (_sort_key(timestamp, line_num), line))
+ # remove empty log stream from the dict
+ if log_stream_to_remove is not None:
+ for idx in log_stream_to_remove:
+ del parsed_log_streams[idx]
+
+
+def _interleave_logs(*log_streams: RawLogStream) -> StructuredLogStream:
+ """
+ Merge parsed log streams using K-way merge.
+
+ By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we
can reduce the chance of messing up the global order.
+ Since there are multiple log streams, we can't guarantee that the records
are in global order.
+
+ e.g.
+
+ log_stream1: ----------
+ log_stream2: ----
+ log_stream3: --------
+
+ The first record of log_stream3 is later than the fourth record of
log_stream1 !
+ :param parsed_log_streams: parsed log streams
+ :return: interleaved log stream
+ """
+ # don't need to push whole tuple into heap, which increases too much
overhead
+ # push only sort_key and line into heap
+ heap: list[tuple[int, StructuredLogMessage]] = []
+ # to allow removing empty streams while iterating, also turn the str
stream into parsed log stream
+ parsed_log_streams: dict[int, ParsedLogStream] = {
+ idx: _log_stream_to_parsed_log_stream(log_stream) for idx, log_stream
in enumerate(log_streams)
+ }
+
+ # keep adding records from logs until all logs are empty
last = None
- for timestamp, _, msg in sorted(records, key=lambda x: (x[0] or min_date,
x[1])):
- if msg != last or not timestamp: # dedupe
- yield msg
- last = msg
+ while parsed_log_streams:
+ _add_log_from_parsed_log_streams_to_heap(heap, parsed_log_streams)
+
+ # yield HALF_HEAP_DUMP_SIZE records when heap size exceeds
HEAP_DUMP_SIZE
+ if len(heap) >= HEAP_DUMP_SIZE:
+ for _ in range(HALF_HEAP_DUMP_SIZE):
+ sort_key, line = heapq.heappop(heap)
+ if line != last or
_is_sort_key_with_default_timestamp(sort_key): # dedupe
+ yield line
+ last = line
+
+ # yield remaining records
+ for _ in range(len(heap)):
+ sort_key, line = heapq.heappop(heap)
+ if line != last or _is_sort_key_with_default_timestamp(sort_key): #
dedupe
+ yield line
+ last = line
+ # free memory
+ del heap
+ del parsed_log_streams
+
+
+def _is_logs_stream_like(log):
Review Comment:
```suggestion
def _is_logs_stream_like(log) -> bool:
```
##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -166,17 +242,133 @@ def _parse_log_lines(
log.timestamp = coerce_datetime(log.timestamp)
timestamp = log.timestamp
yield timestamp, idx, log
+ idx += 1
-def _interleave_logs(*logs: str | LogMessages) ->
Iterable[StructuredLogMessage]:
- min_date = pendulum.datetime(2000, 1, 1)
+def _sort_key(timestamp: datetime | None, line_num: int) -> int:
+ """
+ Generate a sort key for log record, to be used in K-way merge.
- records = itertools.chain.from_iterable(_parse_log_lines(log) for log in
logs)
+ :param timestamp: timestamp of the log line
+ :param line_num: line number of the log line
+ :return: a integer as sort key to avoid overhead of memory usage
+ """
+ return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) *
SORT_KEY_OFFSET + line_num
+
+
+def _is_sort_key_with_default_timestamp(sort_key: int) -> bool:
+ """
+ Check if the sort key was generated with the DEFAULT_SORT_TIMESTAMP.
+
+ This is used to identify log records that don't have timestamp.
+
+ :param sort_key: The sort key to check
+ :return: True if the sort key was generated with DEFAULT_SORT_TIMESTAMP,
False otherwise
+ """
+ # Extract the timestamp part from the sort key (remove the line number
part)
+ timestamp_part = sort_key // SORT_KEY_OFFSET
+ return timestamp_part == DEFAULT_SORT_TIMESTAMP
+
+
+def _add_log_from_parsed_log_streams_to_heap(
+ heap: list[tuple[int, StructuredLogMessage]],
+ parsed_log_streams: dict[int, ParsedLogStream],
+) -> None:
+ """
+ Add one log record from each parsed log stream to the heap, and will
remove empty log stream from the dict after iterating.
+
+ :param heap: heap to store log records
+ :param parsed_log_streams: dict of parsed log streams
+ """
+ log_stream_to_remove: list[int] | None = None
+ for idx, log_stream in parsed_log_streams.items():
+ record: ParsedLog | None = next(log_stream, None)
+ if record is None:
+ if log_stream_to_remove is None:
+ log_stream_to_remove = []
+ log_stream_to_remove.append(idx)
+ continue
+ # add type hint to avoid mypy error
+ record = cast("ParsedLog", record)
+ timestamp, line_num, line = record
+ # take int as sort key to avoid overhead of memory usage
+ heapq.heappush(heap, (_sort_key(timestamp, line_num), line))
+ # remove empty log stream from the dict
+ if log_stream_to_remove is not None:
+ for idx in log_stream_to_remove:
+ del parsed_log_streams[idx]
+
+
+def _interleave_logs(*log_streams: RawLogStream) -> StructuredLogStream:
+ """
+ Merge parsed log streams using K-way merge.
+
+ By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we
can reduce the chance of messing up the global order.
+ Since there are multiple log streams, we can't guarantee that the records
are in global order.
+
+ e.g.
+
+ log_stream1: ----------
+ log_stream2: ----
+ log_stream3: --------
+
+ The first record of log_stream3 is later than the fourth record of
log_stream1 !
+ :param parsed_log_streams: parsed log streams
+ :return: interleaved log stream
+ """
+ # don't need to push whole tuple into heap, which increases too much
overhead
+ # push only sort_key and line into heap
+ heap: list[tuple[int, StructuredLogMessage]] = []
+ # to allow removing empty streams while iterating, also turn the str
stream into parsed log stream
+ parsed_log_streams: dict[int, ParsedLogStream] = {
+ idx: _log_stream_to_parsed_log_stream(log_stream) for idx, log_stream
in enumerate(log_streams)
+ }
+
+ # keep adding records from logs until all logs are empty
last = None
- for timestamp, _, msg in sorted(records, key=lambda x: (x[0] or min_date,
x[1])):
- if msg != last or not timestamp: # dedupe
- yield msg
- last = msg
+ while parsed_log_streams:
+ _add_log_from_parsed_log_streams_to_heap(heap, parsed_log_streams)
+
+ # yield HALF_HEAP_DUMP_SIZE records when heap size exceeds
HEAP_DUMP_SIZE
+ if len(heap) >= HEAP_DUMP_SIZE:
+ for _ in range(HALF_HEAP_DUMP_SIZE):
+ sort_key, line = heapq.heappop(heap)
+ if line != last or
_is_sort_key_with_default_timestamp(sort_key): # dedupe
+ yield line
+ last = line
+
+ # yield remaining records
+ for _ in range(len(heap)):
+ sort_key, line = heapq.heappop(heap)
+ if line != last or _is_sort_key_with_default_timestamp(sort_key): #
dedupe
+ yield line
+ last = line
Review Comment:
```suggestion
sort_key, line = heapq.heappop(heap)
if line != last or _is_sort_key_with_default_timestamp(sort_key): #
dedupe
yield line
last = line
```
nit: we might be able to extract it as a function as it looks similar to the
lines above
##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -166,17 +242,133 @@ def _parse_log_lines(
log.timestamp = coerce_datetime(log.timestamp)
timestamp = log.timestamp
yield timestamp, idx, log
+ idx += 1
-def _interleave_logs(*logs: str | LogMessages) ->
Iterable[StructuredLogMessage]:
- min_date = pendulum.datetime(2000, 1, 1)
+def _sort_key(timestamp: datetime | None, line_num: int) -> int:
+ """
+ Generate a sort key for log record, to be used in K-way merge.
- records = itertools.chain.from_iterable(_parse_log_lines(log) for log in
logs)
+ :param timestamp: timestamp of the log line
+ :param line_num: line number of the log line
+ :return: a integer as sort key to avoid overhead of memory usage
+ """
+ return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) *
SORT_KEY_OFFSET + line_num
+
+
+def _is_sort_key_with_default_timestamp(sort_key: int) -> bool:
+ """
+ Check if the sort key was generated with the DEFAULT_SORT_TIMESTAMP.
+
+ This is used to identify log records that don't have timestamp.
+
+ :param sort_key: The sort key to check
+ :return: True if the sort key was generated with DEFAULT_SORT_TIMESTAMP,
False otherwise
+ """
+ # Extract the timestamp part from the sort key (remove the line number
part)
+ timestamp_part = sort_key // SORT_KEY_OFFSET
+ return timestamp_part == DEFAULT_SORT_TIMESTAMP
+
+
+def _add_log_from_parsed_log_streams_to_heap(
+ heap: list[tuple[int, StructuredLogMessage]],
+ parsed_log_streams: dict[int, ParsedLogStream],
+) -> None:
+ """
+ Add one log record from each parsed log stream to the heap, and will
remove empty log stream from the dict after iterating.
+
+ :param heap: heap to store log records
+ :param parsed_log_streams: dict of parsed log streams
+ """
+ log_stream_to_remove: list[int] | None = None
+ for idx, log_stream in parsed_log_streams.items():
+ record: ParsedLog | None = next(log_stream, None)
+ if record is None:
+ if log_stream_to_remove is None:
+ log_stream_to_remove = []
+ log_stream_to_remove.append(idx)
+ continue
+ # add type hint to avoid mypy error
+ record = cast("ParsedLog", record)
+ timestamp, line_num, line = record
+ # take int as sort key to avoid overhead of memory usage
+ heapq.heappush(heap, (_sort_key(timestamp, line_num), line))
+ # remove empty log stream from the dict
+ if log_stream_to_remove is not None:
+ for idx in log_stream_to_remove:
+ del parsed_log_streams[idx]
+
+
+def _interleave_logs(*log_streams: RawLogStream) -> StructuredLogStream:
+ """
+ Merge parsed log streams using K-way merge.
+
+ By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we
can reduce the chance of messing up the global order.
+ Since there are multiple log streams, we can't guarantee that the records
are in global order.
+
+ e.g.
+
+ log_stream1: ----------
+ log_stream2: ----
+ log_stream3: --------
+
+ The first record of log_stream3 is later than the fourth record of
log_stream1 !
+ :param parsed_log_streams: parsed log streams
+ :return: interleaved log stream
+ """
+ # don't need to push whole tuple into heap, which increases too much
overhead
+ # push only sort_key and line into heap
+ heap: list[tuple[int, StructuredLogMessage]] = []
+ # to allow removing empty streams while iterating, also turn the str
stream into parsed log stream
+ parsed_log_streams: dict[int, ParsedLogStream] = {
+ idx: _log_stream_to_parsed_log_stream(log_stream) for idx, log_stream
in enumerate(log_streams)
+ }
+
+ # keep adding records from logs until all logs are empty
last = None
- for timestamp, _, msg in sorted(records, key=lambda x: (x[0] or min_date,
x[1])):
- if msg != last or not timestamp: # dedupe
- yield msg
- last = msg
+ while parsed_log_streams:
+ _add_log_from_parsed_log_streams_to_heap(heap, parsed_log_streams)
+
+ # yield HALF_HEAP_DUMP_SIZE records when heap size exceeds
HEAP_DUMP_SIZE
+ if len(heap) >= HEAP_DUMP_SIZE:
+ for _ in range(HALF_HEAP_DUMP_SIZE):
+ sort_key, line = heapq.heappop(heap)
+ if line != last or
_is_sort_key_with_default_timestamp(sort_key): # dedupe
+ yield line
+ last = line
+
+ # yield remaining records
+ for _ in range(len(heap)):
+ sort_key, line = heapq.heappop(heap)
+ if line != last or _is_sort_key_with_default_timestamp(sort_key): #
dedupe
+ yield line
+ last = line
+ # free memory
+ del heap
+ del parsed_log_streams
+
+
+def _is_logs_stream_like(log):
+ """Check if the logs are stream-like."""
+ return isinstance(log, chain) or isinstance(log, GeneratorType)
Review Comment:
```suggestion
return isinstance(log, (chain, GeneratorType))
```
##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -166,17 +242,133 @@ def _parse_log_lines(
log.timestamp = coerce_datetime(log.timestamp)
timestamp = log.timestamp
yield timestamp, idx, log
+ idx += 1
-def _interleave_logs(*logs: str | LogMessages) ->
Iterable[StructuredLogMessage]:
- min_date = pendulum.datetime(2000, 1, 1)
+def _sort_key(timestamp: datetime | None, line_num: int) -> int:
+ """
+ Generate a sort key for log record, to be used in K-way merge.
- records = itertools.chain.from_iterable(_parse_log_lines(log) for log in
logs)
+ :param timestamp: timestamp of the log line
+ :param line_num: line number of the log line
+ :return: a integer as sort key to avoid overhead of memory usage
+ """
+ return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) *
SORT_KEY_OFFSET + line_num
+
+
+def _is_sort_key_with_default_timestamp(sort_key: int) -> bool:
+ """
+ Check if the sort key was generated with the DEFAULT_SORT_TIMESTAMP.
+
+ This is used to identify log records that don't have timestamp.
+
+ :param sort_key: The sort key to check
+ :return: True if the sort key was generated with DEFAULT_SORT_TIMESTAMP,
False otherwise
+ """
+ # Extract the timestamp part from the sort key (remove the line number
part)
+ timestamp_part = sort_key // SORT_KEY_OFFSET
+ return timestamp_part == DEFAULT_SORT_TIMESTAMP
+
+
+def _add_log_from_parsed_log_streams_to_heap(
+ heap: list[tuple[int, StructuredLogMessage]],
+ parsed_log_streams: dict[int, ParsedLogStream],
+) -> None:
+ """
+ Add one log record from each parsed log stream to the heap, and will
remove empty log stream from the dict after iterating.
+
+ :param heap: heap to store log records
+ :param parsed_log_streams: dict of parsed log streams
+ """
+ log_stream_to_remove: list[int] | None = None
+ for idx, log_stream in parsed_log_streams.items():
+ record: ParsedLog | None = next(log_stream, None)
+ if record is None:
+ if log_stream_to_remove is None:
+ log_stream_to_remove = []
+ log_stream_to_remove.append(idx)
+ continue
+ # add type hint to avoid mypy error
+ record = cast("ParsedLog", record)
+ timestamp, line_num, line = record
+ # take int as sort key to avoid overhead of memory usage
+ heapq.heappush(heap, (_sort_key(timestamp, line_num), line))
+ # remove empty log stream from the dict
+ if log_stream_to_remove is not None:
+ for idx in log_stream_to_remove:
+ del parsed_log_streams[idx]
+
+
+def _interleave_logs(*log_streams: RawLogStream) -> StructuredLogStream:
+ """
+ Merge parsed log streams using K-way merge.
+
+ By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we
can reduce the chance of messing up the global order.
+ Since there are multiple log streams, we can't guarantee that the records
are in global order.
+
+ e.g.
+
+ log_stream1: ----------
+ log_stream2: ----
+ log_stream3: --------
+
+ The first record of log_stream3 is later than the fourth record of
log_stream1 !
+ :param parsed_log_streams: parsed log streams
+ :return: interleaved log stream
+ """
+ # don't need to push whole tuple into heap, which increases too much
overhead
+ # push only sort_key and line into heap
+ heap: list[tuple[int, StructuredLogMessage]] = []
+ # to allow removing empty streams while iterating, also turn the str
stream into parsed log stream
+ parsed_log_streams: dict[int, ParsedLogStream] = {
+ idx: _log_stream_to_parsed_log_stream(log_stream) for idx, log_stream
in enumerate(log_streams)
+ }
+
+ # keep adding records from logs until all logs are empty
last = None
- for timestamp, _, msg in sorted(records, key=lambda x: (x[0] or min_date,
x[1])):
- if msg != last or not timestamp: # dedupe
- yield msg
- last = msg
+ while parsed_log_streams:
+ _add_log_from_parsed_log_streams_to_heap(heap, parsed_log_streams)
+
+ # yield HALF_HEAP_DUMP_SIZE records when heap size exceeds
HEAP_DUMP_SIZE
+ if len(heap) >= HEAP_DUMP_SIZE:
+ for _ in range(HALF_HEAP_DUMP_SIZE):
+ sort_key, line = heapq.heappop(heap)
+ if line != last or
_is_sort_key_with_default_timestamp(sort_key): # dedupe
+ yield line
+ last = line
+
+ # yield remaining records
+ for _ in range(len(heap)):
+ sort_key, line = heapq.heappop(heap)
+ if line != last or _is_sort_key_with_default_timestamp(sort_key): #
dedupe
+ yield line
+ last = line
+ # free memory
+ del heap
+ del parsed_log_streams
+
+
+def _is_logs_stream_like(log):
+ """Check if the logs are stream-like."""
+ return isinstance(log, chain) or isinstance(log, GeneratorType)
+
+
+def _get_compatible_log_stream(
+ log_messages: LogMessages,
+) -> RawLogStream:
+ """
+ Convert legacy log message blobs into a generator that yields log lines.
+
+ :param log_messages: List of legacy log message strings.
+ :return: A generator that yields interleaved log lines.
+ """
+ log_streams: list[RawLogStream] = []
+ for log_message in log_messages:
+ # Append the log stream to the list
+ log_streams.append(_stream_lines_by_chunk(io.StringIO(log_message)))
Review Comment:
```suggestion
log_streams: list[RawLogStream] = [
_stream_lines_by_chunk(io.StringIO(log_message))
for log_message in log_messages
]
```
##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -36,24 +39,66 @@
from airflow.configuration import conf
from airflow.executors.executor_loader import ExecutorLoader
from airflow.utils.helpers import parse_template_string, render_template
+from airflow.utils.log.log_stream_accumulator import LogStreamAccumulator
from airflow.utils.log.logging_mixin import SetContextPropagate
from airflow.utils.log.non_caching_file_handler import
NonCachingRotatingFileHandler
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import State, TaskInstanceState
if TYPE_CHECKING:
+ from requests import Response
+
from airflow.executors.base_executor import BaseExecutor
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancehistory import TaskInstanceHistory
from airflow.typing_compat import TypeAlias
+CHUNK_SIZE = 1024 * 1024 * 5 # 5MB
+DEFAULT_SORT_DATETIME = pendulum.datetime(2000, 1, 1)
+DEFAULT_SORT_TIMESTAMP = int(DEFAULT_SORT_DATETIME.timestamp() * 1000)
+SORT_KEY_OFFSET = 10000000
+"""An offset used by the _sort_key utility.
+
+Assuming 50 characters per line, an offset of 10,000,000 can represent
approximately 500 MB of file data, which is sufficient for use as a constant.
+"""
+HEAP_DUMP_SIZE = 5000
+HALF_HEAP_DUMP_SIZE = HEAP_DUMP_SIZE // 2
# These types are similar, but have distinct names to make processing them
less error prone
-LogMessages: TypeAlias = Union[list["StructuredLogMessage"], list[str]]
-"""The log messages themselves, either in already sturcutured form, or a
single string blob to be parsed later"""
+LogMessages: TypeAlias = list[str]
+"""The legacy format of log messages, represented as a single string blob to
be parsed later."""
LogSourceInfo: TypeAlias = list[str]
"""Information _about_ the log fetching process for display to a user"""
LogMetadata: TypeAlias = dict[str, Any]
Review Comment:
not sure whehter TypeDict would be better
##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -166,17 +242,133 @@ def _parse_log_lines(
log.timestamp = coerce_datetime(log.timestamp)
timestamp = log.timestamp
yield timestamp, idx, log
+ idx += 1
-def _interleave_logs(*logs: str | LogMessages) ->
Iterable[StructuredLogMessage]:
- min_date = pendulum.datetime(2000, 1, 1)
+def _sort_key(timestamp: datetime | None, line_num: int) -> int:
Review Comment:
```suggestion
def _create_sort_key(timestamp: datetime | None, line_num: int) -> int:
```
not sure whether it's better 🤔 it kinda confused me when I saw `sort_key` is
a none and `_sort_key` looks like a verb that returns `sort_key`
##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -36,24 +39,66 @@
from airflow.configuration import conf
from airflow.executors.executor_loader import ExecutorLoader
from airflow.utils.helpers import parse_template_string, render_template
+from airflow.utils.log.log_stream_accumulator import LogStreamAccumulator
from airflow.utils.log.logging_mixin import SetContextPropagate
from airflow.utils.log.non_caching_file_handler import
NonCachingRotatingFileHandler
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import State, TaskInstanceState
if TYPE_CHECKING:
+ from requests import Response
+
from airflow.executors.base_executor import BaseExecutor
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancehistory import TaskInstanceHistory
from airflow.typing_compat import TypeAlias
+CHUNK_SIZE = 1024 * 1024 * 5 # 5MB
+DEFAULT_SORT_DATETIME = pendulum.datetime(2000, 1, 1)
+DEFAULT_SORT_TIMESTAMP = int(DEFAULT_SORT_DATETIME.timestamp() * 1000)
+SORT_KEY_OFFSET = 10000000
+"""An offset used by the _sort_key utility.
+
+Assuming 50 characters per line, an offset of 10,000,000 can represent
approximately 500 MB of file data, which is sufficient for use as a constant.
+"""
+HEAP_DUMP_SIZE = 5000
+HALF_HEAP_DUMP_SIZE = HEAP_DUMP_SIZE // 2
# These types are similar, but have distinct names to make processing them
less error prone
-LogMessages: TypeAlias = Union[list["StructuredLogMessage"], list[str]]
-"""The log messages themselves, either in already sturcutured form, or a
single string blob to be parsed later"""
+LogMessages: TypeAlias = list[str]
+"""The legacy format of log messages, represented as a single string blob to
be parsed later."""
LogSourceInfo: TypeAlias = list[str]
"""Information _about_ the log fetching process for display to a user"""
LogMetadata: TypeAlias = dict[str, Any]
+"""Metadata about the log fetching process, including `end_of_log` and
`log_pos`.
+
+- `end_of_log`: Boolean. Indicates if the log has ended.
+- `log_pos`: Integer. The absolute character position up to which the log was
retrieved across all sources.
+"""
+RawLogStream: TypeAlias = Generator[str, None, None]
+"""Raw log stream, containing unparsed log lines."""
+LegacyLogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages]
Review Comment:
By `Legacy` do you mean airflow 2 or before this PR
##########
airflow-core/src/airflow/utils/log/log_stream_accumulator.py:
##########
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import os
+import tempfile
+from itertools import islice
+from typing import IO, TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from airflow.typing_compat import Self
+ from airflow.utils.log.file_task_handler import (
+ LogHandlerOutputStream,
+ StructuredLogMessage,
+ StructuredLogStream,
+ )
+
+
+class LogStreamAccumulator:
+ """
+ Memory-efficient log stream accumulator that tracks the total number of
lines while preserving the original stream.
+
+ This class captures logs from a stream and stores them in a buffer,
flushing them to disk when the buffer
+ exceeds a specified threshold. This approach optimizes memory usage while
handling large log streams.
+
+ Usage:
+
+ .. code-block:: python
+
+ with LogStreamAccumulator(stream, threshold) as log_accumulator:
+ # Get total number of lines captured
+ total_lines = log_accumulator.get_total_lines()
+
+ # Retrieve the original stream of logs
+ for log in log_accumulator.get_stream():
+ print(log)
+ """
+
+ def __init__(
+ self,
+ stream: LogHandlerOutputStream,
+ threshold: int,
+ ) -> None:
+ """
+ Initialize the LogStreamAccumulator.
+
+ Args:
+ stream: The input log stream to capture and count.
+ threshold: Maximum number of lines to keep in memory before
flushing to disk.
+ """
+ self._stream = stream
+ self._threshold = threshold
+ self._buffer: list[StructuredLogMessage] = []
+ self._disk_lines: int = 0
+ self._tmpfile: IO[str] | None = None
+
+ def _flush_buffer_to_disk(self) -> None:
+ """Flush the buffer contents to a temporary file on disk."""
+ if self._tmpfile is None:
+ self._tmpfile = tempfile.NamedTemporaryFile(delete=False,
mode="w+", encoding="utf-8")
+
+ self._disk_lines += len(self._buffer)
+ self._tmpfile.writelines(f"{log.model_dump_json()}\n" for log in
self._buffer)
+ self._tmpfile.flush()
+ self._buffer.clear()
+
+ def _capture(self) -> None:
+ """Capture logs from the stream into the buffer, flushing to disk when
threshold is reached."""
+ while True:
+ # `islice` will try to get up to `self._threshold` lines from the
stream.
+ self._buffer.extend(islice(self._stream, self._threshold))
+ # If the buffer has reached the threshold, flush it to disk.
+ if len(self._buffer) >= self._threshold:
+ self._flush_buffer_to_disk()
+ else: # If there are no more lines to capture, exit the loop.
+ break
+
+ def _cleanup(self) -> None:
+ """Clean up the temporary file if it exists."""
+ self._buffer.clear()
+ if self._tmpfile:
+ self._tmpfile.close()
+ os.remove(self._tmpfile.name)
+ self._tmpfile = None
+
+ def get_total_lines(self) -> int:
+ """
+ Return the total number of lines captured from the stream.
+
+ Returns:
+ The sum of lines stored in the buffer and lines written to disk.
+ """
+ return self._disk_lines + len(self._buffer)
+
+ def get_stream(self) -> StructuredLogStream:
Review Comment:
not sure whether making it a property makes sense.
##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -370,22 +562,38 @@ def _read(
# initializing the handler. Thus explicitly getting log location
# is needed to get correct log path.
worker_log_rel_path = self._render_filename(ti, try_number)
+ sources: LogSourceInfo = []
source_list: list[str] = []
- remote_logs: LogMessages | None = []
- local_logs: list[str] = []
- sources: list[str] = []
- executor_logs: list[str] = []
- served_logs: LogMessages = []
+ remote_logs: list[RawLogStream] = []
+ local_logs: list[RawLogStream] = []
+ executor_logs: list[RawLogStream] = []
+ served_logs: list[RawLogStream] = []
with suppress(NotImplementedError):
- sources, remote_logs = self._read_remote_logs(ti, try_number,
metadata)
-
+ sources, logs = self._read_remote_logs(ti, try_number, metadata)
+ if not len(logs):
Review Comment:
```suggestion
if not logs:
```
##########
airflow-core/src/airflow/utils/serve_logs.py:
##########
@@ -135,7 +135,7 @@ def validate_pre_signed_url():
@flask_app.route("/log/<path:filename>")
def serve_logs_view(filename):
- return send_from_directory(log_directory, filename,
mimetype="application/json", as_attachment=False)
+ return send_from_directory(log_directory, filename,
mimetype="text/plain", as_attachment=False)
Review Comment:
may i know why did we change this?
##########
airflow-core/src/airflow/utils/log/log_stream_accumulator.py:
##########
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import os
+import tempfile
+from itertools import islice
+from typing import IO, TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from airflow.typing_compat import Self
+ from airflow.utils.log.file_task_handler import (
+ LogHandlerOutputStream,
+ StructuredLogMessage,
+ StructuredLogStream,
+ )
+
+
+class LogStreamAccumulator:
+ """
+ Memory-efficient log stream accumulator that tracks the total number of
lines while preserving the original stream.
+
+ This class captures logs from a stream and stores them in a buffer,
flushing them to disk when the buffer
+ exceeds a specified threshold. This approach optimizes memory usage while
handling large log streams.
+
+ Usage:
+
+ .. code-block:: python
+
+ with LogStreamAccumulator(stream, threshold) as log_accumulator:
+ # Get total number of lines captured
+ total_lines = log_accumulator.get_total_lines()
+
+ # Retrieve the original stream of logs
+ for log in log_accumulator.get_stream():
+ print(log)
+ """
+
+ def __init__(
+ self,
+ stream: LogHandlerOutputStream,
+ threshold: int,
+ ) -> None:
+ """
+ Initialize the LogStreamAccumulator.
+
+ Args:
+ stream: The input log stream to capture and count.
+ threshold: Maximum number of lines to keep in memory before
flushing to disk.
+ """
+ self._stream = stream
+ self._threshold = threshold
+ self._buffer: list[StructuredLogMessage] = []
+ self._disk_lines: int = 0
+ self._tmpfile: IO[str] | None = None
+
+ def _flush_buffer_to_disk(self) -> None:
+ """Flush the buffer contents to a temporary file on disk."""
+ if self._tmpfile is None:
+ self._tmpfile = tempfile.NamedTemporaryFile(delete=False,
mode="w+", encoding="utf-8")
+
+ self._disk_lines += len(self._buffer)
+ self._tmpfile.writelines(f"{log.model_dump_json()}\n" for log in
self._buffer)
+ self._tmpfile.flush()
+ self._buffer.clear()
+
+ def _capture(self) -> None:
+ """Capture logs from the stream into the buffer, flushing to disk when
threshold is reached."""
+ while True:
+ # `islice` will try to get up to `self._threshold` lines from the
stream.
+ self._buffer.extend(islice(self._stream, self._threshold))
+ # If the buffer has reached the threshold, flush it to disk.
+ if len(self._buffer) >= self._threshold:
+ self._flush_buffer_to_disk()
+ else: # If there are no more lines to capture, exit the loop.
+ break
+
+ def _cleanup(self) -> None:
+ """Clean up the temporary file if it exists."""
+ self._buffer.clear()
+ if self._tmpfile:
+ self._tmpfile.close()
+ os.remove(self._tmpfile.name)
+ self._tmpfile = None
+
+ def get_total_lines(self) -> int:
Review Comment:
not sure whether making it a property makes sense.
##########
airflow-core/src/airflow/utils/log/log_reader.py:
##########
@@ -72,7 +73,7 @@ def read_log_chunks(
contain information about the task log which can enable you read logs
to the
end.
"""
- return self.log_handler.read(ti, try_number, metadata=metadata)
+ return self.log_handler.read(ti, try_number, metadata)
Review Comment:
why this change?
##########
airflow-core/src/airflow/utils/log/log_stream_accumulator.py:
##########
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import os
+import tempfile
+from itertools import islice
+from typing import IO, TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from airflow.typing_compat import Self
+ from airflow.utils.log.file_task_handler import (
+ LogHandlerOutputStream,
+ StructuredLogMessage,
+ StructuredLogStream,
+ )
+
+
+class LogStreamAccumulator:
+ """
+ Memory-efficient log stream accumulator that tracks the total number of
lines while preserving the original stream.
+
+ This class captures logs from a stream and stores them in a buffer,
flushing them to disk when the buffer
+ exceeds a specified threshold. This approach optimizes memory usage while
handling large log streams.
+
+ Usage:
+
+ .. code-block:: python
+
+ with LogStreamAccumulator(stream, threshold) as log_accumulator:
+ # Get total number of lines captured
+ total_lines = log_accumulator.get_total_lines()
+
+ # Retrieve the original stream of logs
+ for log in log_accumulator.get_stream():
+ print(log)
+ """
+
+ def __init__(
+ self,
+ stream: LogHandlerOutputStream,
+ threshold: int,
+ ) -> None:
+ """
+ Initialize the LogStreamAccumulator.
+
+ Args:
+ stream: The input log stream to capture and count.
+ threshold: Maximum number of lines to keep in memory before
flushing to disk.
+ """
+ self._stream = stream
+ self._threshold = threshold
+ self._buffer: list[StructuredLogMessage] = []
+ self._disk_lines: int = 0
+ self._tmpfile: IO[str] | None = None
+
+ def _flush_buffer_to_disk(self) -> None:
+ """Flush the buffer contents to a temporary file on disk."""
+ if self._tmpfile is None:
+ self._tmpfile = tempfile.NamedTemporaryFile(delete=False,
mode="w+", encoding="utf-8")
+
+ self._disk_lines += len(self._buffer)
+ self._tmpfile.writelines(f"{log.model_dump_json()}\n" for log in
self._buffer)
+ self._tmpfile.flush()
+ self._buffer.clear()
+
+ def _capture(self) -> None:
+ """Capture logs from the stream into the buffer, flushing to disk when
threshold is reached."""
+ while True:
+ # `islice` will try to get up to `self._threshold` lines from the
stream.
+ self._buffer.extend(islice(self._stream, self._threshold))
+ # If the buffer has reached the threshold, flush it to disk.
+ if len(self._buffer) >= self._threshold:
+ self._flush_buffer_to_disk()
+ else: # If there are no more lines to capture, exit the loop.
+ break
Review Comment:
```suggestion
# If there are no more lines to capture, exit the loop.
if len(self._buffer) < self._threshold:
break
self._flush_buffer_to_disk()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]