potiuk commented on code in PR #45129:
URL: https://github.com/apache/airflow/pull/45129#discussion_r1900427959
##########
airflow/utils/log/file_task_handler.py:
##########
@@ -110,30 +133,111 @@ def _parse_timestamp(line: str):
return pendulum.parse(timestamp_str.strip("[]"))
-def _parse_timestamps_in_log_file(lines: Iterable[str]):
- timestamp = None
- next_timestamp = None
- for idx, line in enumerate(lines):
- if line:
- with suppress(Exception):
- # next_timestamp unchanged if line can't be parsed
- next_timestamp = _parse_timestamp(line)
- if next_timestamp:
- timestamp = next_timestamp
- yield timestamp, idx, line
+def _get_parsed_log_stream(file_path: Path) -> _ParsedLogStreamType:
+ with open(file_path) as f:
+ line_num = 0 # line number for each log line
+ for file_chunk in iter(partial(f.read, CHUNK_SIZE), b""):
+ if not file_chunk:
+ break
+ # parse log lines
+ lines = file_chunk.splitlines()
+ timestamp = None
+ next_timestamp = None
+ for line in lines:
+ if line:
+ with suppress(Exception):
+ # next_timestamp unchanged if line can't be parsed
+ next_timestamp = _parse_timestamp(line)
+ if next_timestamp:
+ timestamp = next_timestamp
+
+ yield timestamp, line_num, line
+ line_num += 1
+
+
+def _sort_key(timestamp: pendulum.DateTime | None, line_num: int) -> int:
+ """
+ Generate a sort key for log record, to be used in K-way merge.
+
+ :param timestamp: timestamp of the log line
+ :param line_num: line number of the log line
+ :return: a integer as sort key to avoid overhead of memory usage
+ """
+ return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) *
SORT_KEY_OFFSET + line_num
+
+
+def _add_log_from_parsed_log_streams_to_heap(
+ heap: list[tuple[int, str]],
+ parsed_log_streams: list[_ParsedLogStreamType],
+) -> None:
+ """
+ Add one log record from each parsed log stream to the heap.
+ Remove any empty log stream from the list while iterating.
-def _interleave_logs(*logs):
- records = []
- for log in logs:
- records.extend(_parse_timestamps_in_log_file(log.splitlines()))
+ :param heap: heap to store log records
+ :param parsed_log_streams: list of parsed log streams
+ """
+ for log_stream in parsed_log_streams:
+ record: _ParsedLogRecordType | None = next(log_stream, None)
+ if record is None:
+ parsed_log_streams.remove(log_stream)
+ continue
+ timestamp, line_num, line = record
+ # take int as sort key to avoid overhead of memory usage
+ heapq.heappush(heap, (_sort_key(timestamp, line_num), line))
+
+
+def _interleave_logs(*parsed_log_streams: _ParsedLogStreamType) ->
Generator[str, None, None]:
+ """
+ Merge parsed log streams using K-way merge.
+
+ By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we
can reduce the chance of messing up the global order.
+ Since there are multiple log streams, we can't guarantee that the records
are in global order.
Review Comment:
Nice
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]