Lee-W commented on code in PR #49470:
URL: https://github.com/apache/airflow/pull/49470#discussion_r2160953507
##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -166,17 +248,147 @@ def _parse_log_lines(
log.timestamp = coerce_datetime(log.timestamp)
timestamp = log.timestamp
yield timestamp, idx, log
+ idx += 1
+
+def _create_sort_key(timestamp: datetime | None, line_num: int) -> int:
+ """
+ Create a sort key for log record, to be used in K-way merge.
+
+ :param timestamp: timestamp of the log line
+ :param line_num: line number of the log line
+ :return: a integer as sort key to avoid overhead of memory usage
+ """
+ return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) *
SORT_KEY_OFFSET + line_num
-def _interleave_logs(*logs: str | LogMessages) ->
Iterable[StructuredLogMessage]:
- min_date = pendulum.datetime(2000, 1, 1)
- records = itertools.chain.from_iterable(_parse_log_lines(log) for log in
logs)
- last = None
- for timestamp, _, msg in sorted(records, key=lambda x: (x[0] or min_date,
x[1])):
- if msg != last or not timestamp: # dedupe
- yield msg
- last = msg
+def _is_sort_key_with_default_timestamp(sort_key: int) -> bool:
+ """
+ Check if the sort key was generated with the DEFAULT_SORT_TIMESTAMP.
+
+ This is used to identify log records that don't have timestamp.
+
+ :param sort_key: The sort key to check
+ :return: True if the sort key was generated with DEFAULT_SORT_TIMESTAMP,
False otherwise
+ """
+ # Extract the timestamp part from the sort key (remove the line number
part)
+ timestamp_part = sort_key // SORT_KEY_OFFSET
+ return timestamp_part == DEFAULT_SORT_TIMESTAMP
+
+
+def _add_log_from_parsed_log_streams_to_heap(
+ heap: list[tuple[int, StructuredLogMessage]],
+ parsed_log_streams: dict[int, ParsedLogStream],
+) -> None:
+ """
+ Add one log record from each parsed log stream to the heap, and will
remove empty log stream from the dict after iterating.
+
+ :param heap: heap to store log records
+ :param parsed_log_streams: dict of parsed log streams
+ """
+ log_stream_to_remove: list[int] | None = None
+ for idx, log_stream in parsed_log_streams.items():
+ record: ParsedLog | None = next(log_stream, None)
+ if record is None:
+ if log_stream_to_remove is None:
+ log_stream_to_remove = []
+ log_stream_to_remove.append(idx)
+ continue
+ # add type hint to avoid mypy error
+ record = cast("ParsedLog", record)
+ timestamp, line_num, line = record
+ # take int as sort key to avoid overhead of memory usage
+ heapq.heappush(heap, (_create_sort_key(timestamp, line_num), line))
+ # remove empty log stream from the dict
+ if log_stream_to_remove is not None:
+ for idx in log_stream_to_remove:
+ del parsed_log_streams[idx]
Review Comment:
If that's the case, we probably should doc it as well. I bet there will be
someone who wants to "refactor" it if we don't doc here. I also did some quick
experiments, and yes there could be some overhead we'd better avoid when
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]