jason810496 commented on code in PR #49470:
URL: https://github.com/apache/airflow/pull/49470#discussion_r2151463871
##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -166,17 +242,133 @@ def _parse_log_lines(
log.timestamp = coerce_datetime(log.timestamp)
timestamp = log.timestamp
yield timestamp, idx, log
+ idx += 1
-def _interleave_logs(*logs: str | LogMessages) ->
Iterable[StructuredLogMessage]:
- min_date = pendulum.datetime(2000, 1, 1)
+def _sort_key(timestamp: datetime | None, line_num: int) -> int:
+ """
+ Generate a sort key for log record, to be used in K-way merge.
- records = itertools.chain.from_iterable(_parse_log_lines(log) for log in
logs)
+ :param timestamp: timestamp of the log line
+ :param line_num: line number of the log line
+ :return: a integer as sort key to avoid overhead of memory usage
+ """
+ return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) *
SORT_KEY_OFFSET + line_num
+
+
+def _is_sort_key_with_default_timestamp(sort_key: int) -> bool:
+ """
+ Check if the sort key was generated with the DEFAULT_SORT_TIMESTAMP.
+
+ This is used to identify log records that don't have timestamp.
+
+ :param sort_key: The sort key to check
+ :return: True if the sort key was generated with DEFAULT_SORT_TIMESTAMP,
False otherwise
+ """
+ # Extract the timestamp part from the sort key (remove the line number
part)
+ timestamp_part = sort_key // SORT_KEY_OFFSET
+ return timestamp_part == DEFAULT_SORT_TIMESTAMP
+
+
+def _add_log_from_parsed_log_streams_to_heap(
+ heap: list[tuple[int, StructuredLogMessage]],
+ parsed_log_streams: dict[int, ParsedLogStream],
+) -> None:
+ """
+ Add one log record from each parsed log stream to the heap, and will
remove empty log stream from the dict after iterating.
+
+ :param heap: heap to store log records
+ :param parsed_log_streams: dict of parsed log streams
+ """
+ log_stream_to_remove: list[int] | None = None
+ for idx, log_stream in parsed_log_streams.items():
+ record: ParsedLog | None = next(log_stream, None)
+ if record is None:
+ if log_stream_to_remove is None:
+ log_stream_to_remove = []
+ log_stream_to_remove.append(idx)
+ continue
+ # add type hint to avoid mypy error
+ record = cast("ParsedLog", record)
+ timestamp, line_num, line = record
+ # take int as sort key to avoid overhead of memory usage
+ heapq.heappush(heap, (_sort_key(timestamp, line_num), line))
+ # remove empty log stream from the dict
+ if log_stream_to_remove is not None:
+ for idx in log_stream_to_remove:
+ del parsed_log_streams[idx]
+
+
+def _interleave_logs(*log_streams: RawLogStream) -> StructuredLogStream:
+ """
+ Merge parsed log streams using K-way merge.
+
+ By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we
can reduce the chance of messing up the global order.
+ Since there are multiple log streams, we can't guarantee that the records
are in global order.
+
+ e.g.
+
+ log_stream1: ----------
+ log_stream2: ----
+ log_stream3: --------
+
+ The first record of log_stream3 is later than the fourth record of
log_stream1 !
+ :param parsed_log_streams: parsed log streams
+ :return: interleaved log stream
+ """
+ # don't need to push whole tuple into heap, which increases too much
overhead
+ # push only sort_key and line into heap
+ heap: list[tuple[int, StructuredLogMessage]] = []
+ # to allow removing empty streams while iterating, also turn the str
stream into parsed log stream
+ parsed_log_streams: dict[int, ParsedLogStream] = {
+ idx: _log_stream_to_parsed_log_stream(log_stream) for idx, log_stream
in enumerate(log_streams)
+ }
+
+ # keep adding records from logs until all logs are empty
last = None
- for timestamp, _, msg in sorted(records, key=lambda x: (x[0] or min_date,
x[1])):
- if msg != last or not timestamp: # dedupe
- yield msg
- last = msg
+ while parsed_log_streams:
+ _add_log_from_parsed_log_streams_to_heap(heap, parsed_log_streams)
+
+ # yield HALF_HEAP_DUMP_SIZE records when heap size exceeds
HEAP_DUMP_SIZE
+ if len(heap) >= HEAP_DUMP_SIZE:
+ for _ in range(HALF_HEAP_DUMP_SIZE):
+ sort_key, line = heapq.heappop(heap)
+ if line != last or
_is_sort_key_with_default_timestamp(sort_key): # dedupe
+ yield line
+ last = line
+
+ # yield remaining records
+ for _ in range(len(heap)):
+ sort_key, line = heapq.heappop(heap)
+ if line != last or _is_sort_key_with_default_timestamp(sort_key): #
dedupe
+ yield line
+ last = line
+ # free memory
+ del heap
+ del parsed_log_streams
+
+
+def _is_logs_stream_like(log):
+ """Check if the logs are stream-like."""
+ return isinstance(log, chain) or isinstance(log, GeneratorType)
+
+
+def _get_compatible_log_stream(
+ log_messages: LogMessages,
+) -> RawLogStream:
+ """
+ Convert legacy log message blobs into a generator that yields log lines.
+
+ :param log_messages: List of legacy log message strings.
+ :return: A generator that yields interleaved log lines.
+ """
+ log_streams: list[RawLogStream] = []
+ for log_message in log_messages:
+ # Append the log stream to the list
+ log_streams.append(_stream_lines_by_chunk(io.StringIO(log_message)))
Review Comment:
I would like to keep my change.
IMO, for small logs (log lines small then `HEAP_DUMP_SIZE`), we should avoid
flushing the logs to disk to make it faster to response.
Additionally, they're small enough, so buffer them in memory will be fine.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]