jason810496 commented on code in PR #49470:
URL: https://github.com/apache/airflow/pull/49470#discussion_r2151463871
##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -166,17 +242,133 @@ def _parse_log_lines(
log.timestamp = coerce_datetime(log.timestamp)
timestamp = log.timestamp
yield timestamp, idx, log
+ idx += 1
-def _interleave_logs(*logs: str | LogMessages) ->
Iterable[StructuredLogMessage]:
- min_date = pendulum.datetime(2000, 1, 1)
+def _sort_key(timestamp: datetime | None, line_num: int) -> int:
+ """
+ Generate a sort key for log record, to be used in K-way merge.
- records = itertools.chain.from_iterable(_parse_log_lines(log) for log in
logs)
+ :param timestamp: timestamp of the log line
+ :param line_num: line number of the log line
+ :return: a integer as sort key to avoid overhead of memory usage
+ """
+ return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) *
SORT_KEY_OFFSET + line_num
+
+
+def _is_sort_key_with_default_timestamp(sort_key: int) -> bool:
+ """
+ Check if the sort key was generated with the DEFAULT_SORT_TIMESTAMP.
+
+ This is used to identify log records that don't have timestamp.
+
+ :param sort_key: The sort key to check
+ :return: True if the sort key was generated with DEFAULT_SORT_TIMESTAMP,
False otherwise
+ """
+ # Extract the timestamp part from the sort key (remove the line number
part)
+ timestamp_part = sort_key // SORT_KEY_OFFSET
+ return timestamp_part == DEFAULT_SORT_TIMESTAMP
+
+
+def _add_log_from_parsed_log_streams_to_heap(
+ heap: list[tuple[int, StructuredLogMessage]],
+ parsed_log_streams: dict[int, ParsedLogStream],
+) -> None:
+ """
+ Add one log record from each parsed log stream to the heap, and will
remove empty log stream from the dict after iterating.
+
+ :param heap: heap to store log records
+ :param parsed_log_streams: dict of parsed log streams
+ """
+ log_stream_to_remove: list[int] | None = None
+ for idx, log_stream in parsed_log_streams.items():
+ record: ParsedLog | None = next(log_stream, None)
+ if record is None:
+ if log_stream_to_remove is None:
+ log_stream_to_remove = []
+ log_stream_to_remove.append(idx)
+ continue
+ # add type hint to avoid mypy error
+ record = cast("ParsedLog", record)
+ timestamp, line_num, line = record
+ # take int as sort key to avoid overhead of memory usage
+ heapq.heappush(heap, (_sort_key(timestamp, line_num), line))
+ # remove empty log stream from the dict
+ if log_stream_to_remove is not None:
+ for idx in log_stream_to_remove:
+ del parsed_log_streams[idx]
+
+
+def _interleave_logs(*log_streams: RawLogStream) -> StructuredLogStream:
+ """
+ Merge parsed log streams using K-way merge.
+
+ By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we
can reduce the chance of messing up the global order.
+ Since there are multiple log streams, we can't guarantee that the records
are in global order.
+
+ e.g.
+
+ log_stream1: ----------
+ log_stream2: ----
+ log_stream3: --------
+
+ The first record of log_stream3 is later than the fourth record of
log_stream1 !
+ :param parsed_log_streams: parsed log streams
+ :return: interleaved log stream
+ """
+ # don't need to push whole tuple into heap, which increases too much
overhead
+ # push only sort_key and line into heap
+ heap: list[tuple[int, StructuredLogMessage]] = []
+ # to allow removing empty streams while iterating, also turn the str
stream into parsed log stream
+ parsed_log_streams: dict[int, ParsedLogStream] = {
+ idx: _log_stream_to_parsed_log_stream(log_stream) for idx, log_stream
in enumerate(log_streams)
+ }
+
+ # keep adding records from logs until all logs are empty
last = None
- for timestamp, _, msg in sorted(records, key=lambda x: (x[0] or min_date,
x[1])):
- if msg != last or not timestamp: # dedupe
- yield msg
- last = msg
+ while parsed_log_streams:
+ _add_log_from_parsed_log_streams_to_heap(heap, parsed_log_streams)
+
+ # yield HALF_HEAP_DUMP_SIZE records when heap size exceeds
HEAP_DUMP_SIZE
+ if len(heap) >= HEAP_DUMP_SIZE:
+ for _ in range(HALF_HEAP_DUMP_SIZE):
+ sort_key, line = heapq.heappop(heap)
+ if line != last or
_is_sort_key_with_default_timestamp(sort_key): # dedupe
+ yield line
+ last = line
+
+ # yield remaining records
+ for _ in range(len(heap)):
+ sort_key, line = heapq.heappop(heap)
+ if line != last or _is_sort_key_with_default_timestamp(sort_key): #
dedupe
+ yield line
+ last = line
+ # free memory
+ del heap
+ del parsed_log_streams
+
+
+def _is_logs_stream_like(log):
+ """Check if the logs are stream-like."""
+ return isinstance(log, chain) or isinstance(log, GeneratorType)
+
+
+def _get_compatible_log_stream(
+ log_messages: LogMessages,
+) -> RawLogStream:
+ """
+ Convert legacy log message blobs into a generator that yields log lines.
+
+ :param log_messages: List of legacy log message strings.
+ :return: A generator that yields interleaved log lines.
+ """
+ log_streams: list[RawLogStream] = []
+ for log_message in log_messages:
+ # Append the log stream to the list
+ log_streams.append(_stream_lines_by_chunk(io.StringIO(log_message)))
Review Comment:
I would like to keep my change.
IMO, for small logs (log lines small then `HEAP_DUMP_SIZE`), we should avoid
flushing the logs to disk to make it faster to response.
Additionally, they're small enough, so buffer them in memory will be fine.
##########
airflow-core/src/airflow/utils/log/log_stream_accumulator.py:
##########
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import os
+import tempfile
+from itertools import islice
+from typing import IO, TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from airflow.typing_compat import Self
+ from airflow.utils.log.file_task_handler import (
+ LogHandlerOutputStream,
+ StructuredLogMessage,
+ StructuredLogStream,
+ )
+
+
+class LogStreamAccumulator:
+ """
+ Memory-efficient log stream accumulator that tracks the total number of
lines while preserving the original stream.
+
+ This class captures logs from a stream and stores them in a buffer,
flushing them to disk when the buffer
+ exceeds a specified threshold. This approach optimizes memory usage while
handling large log streams.
+
+ Usage:
+
+ .. code-block:: python
+
+ with LogStreamAccumulator(stream, threshold) as log_accumulator:
+ # Get total number of lines captured
+ total_lines = log_accumulator.get_total_lines()
+
+ # Retrieve the original stream of logs
+ for log in log_accumulator.get_stream():
+ print(log)
+ """
+
+ def __init__(
+ self,
+ stream: LogHandlerOutputStream,
+ threshold: int,
+ ) -> None:
+ """
+ Initialize the LogStreamAccumulator.
+
+ Args:
+ stream: The input log stream to capture and count.
+ threshold: Maximum number of lines to keep in memory before
flushing to disk.
+ """
+ self._stream = stream
+ self._threshold = threshold
+ self._buffer: list[StructuredLogMessage] = []
+ self._disk_lines: int = 0
+ self._tmpfile: IO[str] | None = None
+
+ def _flush_buffer_to_disk(self) -> None:
+ """Flush the buffer contents to a temporary file on disk."""
+ if self._tmpfile is None:
+ self._tmpfile = tempfile.NamedTemporaryFile(delete=False,
mode="w+", encoding="utf-8")
+
+ self._disk_lines += len(self._buffer)
+ self._tmpfile.writelines(f"{log.model_dump_json()}\n" for log in
self._buffer)
+ self._tmpfile.flush()
+ self._buffer.clear()
+
+ def _capture(self) -> None:
+ """Capture logs from the stream into the buffer, flushing to disk when
threshold is reached."""
+ while True:
+ # `islice` will try to get up to `self._threshold` lines from the
stream.
+ self._buffer.extend(islice(self._stream, self._threshold))
+ # If the buffer has reached the threshold, flush it to disk.
+ if len(self._buffer) >= self._threshold:
+ self._flush_buffer_to_disk()
+ else: # If there are no more lines to capture, exit the loop.
+ break
Review Comment:
I would like to keep my change.
IMO, for small logs (log lines small then `HEAP_DUMP_SIZE`), we should avoid
flushing the logs to disk to make it faster to response.
Additionally, they're small enough, so buffer them in memory will be fine.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]