Lee-W commented on code in PR #49470:
URL: https://github.com/apache/airflow/pull/49470#discussion_r2151424340


##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -166,17 +242,133 @@ def _parse_log_lines(
                 log.timestamp = coerce_datetime(log.timestamp)
                 timestamp = log.timestamp
             yield timestamp, idx, log
+        idx += 1
 
 
-def _interleave_logs(*logs: str | LogMessages) -> 
Iterable[StructuredLogMessage]:
-    min_date = pendulum.datetime(2000, 1, 1)
+def _sort_key(timestamp: datetime | None, line_num: int) -> int:
+    """
+    Generate a sort key for log record, to be used in K-way merge.
 
-    records = itertools.chain.from_iterable(_parse_log_lines(log) for log in 
logs)
+    :param timestamp: timestamp of the log line
+    :param line_num: line number of the log line
+    :return: a integer as sort key to avoid overhead of memory usage
+    """
+    return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) * 
SORT_KEY_OFFSET + line_num
+
+
+def _is_sort_key_with_default_timestamp(sort_key: int) -> bool:
+    """
+    Check if the sort key was generated with the DEFAULT_SORT_TIMESTAMP.
+
+    This is used to identify log records that don't have timestamp.
+
+    :param sort_key: The sort key to check
+    :return: True if the sort key was generated with DEFAULT_SORT_TIMESTAMP, 
False otherwise
+    """
+    # Extract the timestamp part from the sort key (remove the line number 
part)
+    timestamp_part = sort_key // SORT_KEY_OFFSET
+    return timestamp_part == DEFAULT_SORT_TIMESTAMP
+
+
+def _add_log_from_parsed_log_streams_to_heap(
+    heap: list[tuple[int, StructuredLogMessage]],
+    parsed_log_streams: dict[int, ParsedLogStream],
+) -> None:
+    """
+    Add one log record from each parsed log stream to the heap, and will 
remove empty log stream from the dict after iterating.
+
+    :param heap: heap to store log records
+    :param parsed_log_streams: dict of parsed log streams
+    """
+    log_stream_to_remove: list[int] | None = None
+    for idx, log_stream in parsed_log_streams.items():
+        record: ParsedLog | None = next(log_stream, None)
+        if record is None:
+            if log_stream_to_remove is None:
+                log_stream_to_remove = []
+            log_stream_to_remove.append(idx)
+            continue
+        # add type hint to avoid mypy error
+        record = cast("ParsedLog", record)
+        timestamp, line_num, line = record
+        # take int as sort key to avoid overhead of memory usage
+        heapq.heappush(heap, (_sort_key(timestamp, line_num), line))
+    # remove empty log stream from the dict
+    if log_stream_to_remove is not None:
+        for idx in log_stream_to_remove:
+            del parsed_log_streams[idx]
+
+
+def _interleave_logs(*log_streams: RawLogStream) -> StructuredLogStream:
+    """
+    Merge parsed log streams using K-way merge.
+
+    By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we 
can reduce the chance of messing up the global order.
+    Since there are multiple log streams, we can't guarantee that the records 
are in global order.
+
+    e.g.
+
+    log_stream1: ----------
+    log_stream2:   ----
+    log_stream3:     --------
+
+    The first record of log_stream3 is later than the fourth record of 
log_stream1 !
+    :param parsed_log_streams: parsed log streams
+    :return: interleaved log stream
+    """
+    # don't need to push whole tuple into heap, which increases too much 
overhead
+    # push only sort_key and line into heap
+    heap: list[tuple[int, StructuredLogMessage]] = []
+    # to allow removing empty streams while iterating, also turn the str 
stream into parsed log stream
+    parsed_log_streams: dict[int, ParsedLogStream] = {
+        idx: _log_stream_to_parsed_log_stream(log_stream) for idx, log_stream 
in enumerate(log_streams)
+    }
+
+    # keep adding records from logs until all logs are empty
     last = None
-    for timestamp, _, msg in sorted(records, key=lambda x: (x[0] or min_date, 
x[1])):
-        if msg != last or not timestamp:  # dedupe
-            yield msg
-        last = msg
+    while parsed_log_streams:
+        _add_log_from_parsed_log_streams_to_heap(heap, parsed_log_streams)
+
+        # yield HALF_HEAP_DUMP_SIZE records when heap size exceeds 
HEAP_DUMP_SIZE
+        if len(heap) >= HEAP_DUMP_SIZE:
+            for _ in range(HALF_HEAP_DUMP_SIZE):
+                sort_key, line = heapq.heappop(heap)
+                if line != last or 
_is_sort_key_with_default_timestamp(sort_key):  # dedupe
+                    yield line
+                last = line
+
+    # yield remaining records
+    for _ in range(len(heap)):
+        sort_key, line = heapq.heappop(heap)
+        if line != last or _is_sort_key_with_default_timestamp(sort_key):  # 
dedupe
+            yield line
+        last = line
+    # free memory
+    del heap
+    del parsed_log_streams
+
+
+def _is_logs_stream_like(log):

Review Comment:
   ```suggestion
   def _is_logs_stream_like(log) -> bool:
   ```



##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -166,17 +242,133 @@ def _parse_log_lines(
                 log.timestamp = coerce_datetime(log.timestamp)
                 timestamp = log.timestamp
             yield timestamp, idx, log
+        idx += 1
 
 
-def _interleave_logs(*logs: str | LogMessages) -> 
Iterable[StructuredLogMessage]:
-    min_date = pendulum.datetime(2000, 1, 1)
+def _sort_key(timestamp: datetime | None, line_num: int) -> int:
+    """
+    Generate a sort key for log record, to be used in K-way merge.
 
-    records = itertools.chain.from_iterable(_parse_log_lines(log) for log in 
logs)
+    :param timestamp: timestamp of the log line
+    :param line_num: line number of the log line
+    :return: a integer as sort key to avoid overhead of memory usage
+    """
+    return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) * 
SORT_KEY_OFFSET + line_num
+
+
+def _is_sort_key_with_default_timestamp(sort_key: int) -> bool:
+    """
+    Check if the sort key was generated with the DEFAULT_SORT_TIMESTAMP.
+
+    This is used to identify log records that don't have timestamp.
+
+    :param sort_key: The sort key to check
+    :return: True if the sort key was generated with DEFAULT_SORT_TIMESTAMP, 
False otherwise
+    """
+    # Extract the timestamp part from the sort key (remove the line number 
part)
+    timestamp_part = sort_key // SORT_KEY_OFFSET
+    return timestamp_part == DEFAULT_SORT_TIMESTAMP
+
+
+def _add_log_from_parsed_log_streams_to_heap(
+    heap: list[tuple[int, StructuredLogMessage]],
+    parsed_log_streams: dict[int, ParsedLogStream],
+) -> None:
+    """
+    Add one log record from each parsed log stream to the heap, and will 
remove empty log stream from the dict after iterating.
+
+    :param heap: heap to store log records
+    :param parsed_log_streams: dict of parsed log streams
+    """
+    log_stream_to_remove: list[int] | None = None
+    for idx, log_stream in parsed_log_streams.items():
+        record: ParsedLog | None = next(log_stream, None)
+        if record is None:
+            if log_stream_to_remove is None:
+                log_stream_to_remove = []
+            log_stream_to_remove.append(idx)
+            continue
+        # add type hint to avoid mypy error
+        record = cast("ParsedLog", record)
+        timestamp, line_num, line = record
+        # take int as sort key to avoid overhead of memory usage
+        heapq.heappush(heap, (_sort_key(timestamp, line_num), line))
+    # remove empty log stream from the dict
+    if log_stream_to_remove is not None:
+        for idx in log_stream_to_remove:
+            del parsed_log_streams[idx]
+
+
+def _interleave_logs(*log_streams: RawLogStream) -> StructuredLogStream:
+    """
+    Merge parsed log streams using K-way merge.
+
+    By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we 
can reduce the chance of messing up the global order.
+    Since there are multiple log streams, we can't guarantee that the records 
are in global order.
+
+    e.g.
+
+    log_stream1: ----------
+    log_stream2:   ----
+    log_stream3:     --------
+
+    The first record of log_stream3 is later than the fourth record of 
log_stream1 !
+    :param parsed_log_streams: parsed log streams
+    :return: interleaved log stream
+    """
+    # don't need to push whole tuple into heap, which increases too much 
overhead
+    # push only sort_key and line into heap
+    heap: list[tuple[int, StructuredLogMessage]] = []
+    # to allow removing empty streams while iterating, also turn the str 
stream into parsed log stream
+    parsed_log_streams: dict[int, ParsedLogStream] = {
+        idx: _log_stream_to_parsed_log_stream(log_stream) for idx, log_stream 
in enumerate(log_streams)
+    }
+
+    # keep adding records from logs until all logs are empty
     last = None
-    for timestamp, _, msg in sorted(records, key=lambda x: (x[0] or min_date, 
x[1])):
-        if msg != last or not timestamp:  # dedupe
-            yield msg
-        last = msg
+    while parsed_log_streams:
+        _add_log_from_parsed_log_streams_to_heap(heap, parsed_log_streams)
+
+        # yield HALF_HEAP_DUMP_SIZE records when heap size exceeds 
HEAP_DUMP_SIZE
+        if len(heap) >= HEAP_DUMP_SIZE:
+            for _ in range(HALF_HEAP_DUMP_SIZE):
+                sort_key, line = heapq.heappop(heap)
+                if line != last or 
_is_sort_key_with_default_timestamp(sort_key):  # dedupe
+                    yield line
+                last = line
+
+    # yield remaining records
+    for _ in range(len(heap)):
+        sort_key, line = heapq.heappop(heap)
+        if line != last or _is_sort_key_with_default_timestamp(sort_key):  # 
dedupe
+            yield line
+        last = line

Review Comment:
   ```suggestion
           sort_key, line = heapq.heappop(heap)
           if line != last or _is_sort_key_with_default_timestamp(sort_key):  # 
dedupe
               yield line
           last = line
   ```
   
   nit: we might be able to extract it as a function as it looks similar to the 
lines above



##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -166,17 +242,133 @@ def _parse_log_lines(
                 log.timestamp = coerce_datetime(log.timestamp)
                 timestamp = log.timestamp
             yield timestamp, idx, log
+        idx += 1
 
 
-def _interleave_logs(*logs: str | LogMessages) -> 
Iterable[StructuredLogMessage]:
-    min_date = pendulum.datetime(2000, 1, 1)
+def _sort_key(timestamp: datetime | None, line_num: int) -> int:
+    """
+    Generate a sort key for log record, to be used in K-way merge.
 
-    records = itertools.chain.from_iterable(_parse_log_lines(log) for log in 
logs)
+    :param timestamp: timestamp of the log line
+    :param line_num: line number of the log line
+    :return: a integer as sort key to avoid overhead of memory usage
+    """
+    return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) * 
SORT_KEY_OFFSET + line_num
+
+
+def _is_sort_key_with_default_timestamp(sort_key: int) -> bool:
+    """
+    Check if the sort key was generated with the DEFAULT_SORT_TIMESTAMP.
+
+    This is used to identify log records that don't have timestamp.
+
+    :param sort_key: The sort key to check
+    :return: True if the sort key was generated with DEFAULT_SORT_TIMESTAMP, 
False otherwise
+    """
+    # Extract the timestamp part from the sort key (remove the line number 
part)
+    timestamp_part = sort_key // SORT_KEY_OFFSET
+    return timestamp_part == DEFAULT_SORT_TIMESTAMP
+
+
+def _add_log_from_parsed_log_streams_to_heap(
+    heap: list[tuple[int, StructuredLogMessage]],
+    parsed_log_streams: dict[int, ParsedLogStream],
+) -> None:
+    """
+    Add one log record from each parsed log stream to the heap, and will 
remove empty log stream from the dict after iterating.
+
+    :param heap: heap to store log records
+    :param parsed_log_streams: dict of parsed log streams
+    """
+    log_stream_to_remove: list[int] | None = None
+    for idx, log_stream in parsed_log_streams.items():
+        record: ParsedLog | None = next(log_stream, None)
+        if record is None:
+            if log_stream_to_remove is None:
+                log_stream_to_remove = []
+            log_stream_to_remove.append(idx)
+            continue
+        # add type hint to avoid mypy error
+        record = cast("ParsedLog", record)
+        timestamp, line_num, line = record
+        # take int as sort key to avoid overhead of memory usage
+        heapq.heappush(heap, (_sort_key(timestamp, line_num), line))
+    # remove empty log stream from the dict
+    if log_stream_to_remove is not None:
+        for idx in log_stream_to_remove:
+            del parsed_log_streams[idx]
+
+
+def _interleave_logs(*log_streams: RawLogStream) -> StructuredLogStream:
+    """
+    Merge parsed log streams using K-way merge.
+
+    By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we 
can reduce the chance of messing up the global order.
+    Since there are multiple log streams, we can't guarantee that the records 
are in global order.
+
+    e.g.
+
+    log_stream1: ----------
+    log_stream2:   ----
+    log_stream3:     --------
+
+    The first record of log_stream3 is later than the fourth record of 
log_stream1 !
+    :param parsed_log_streams: parsed log streams
+    :return: interleaved log stream
+    """
+    # don't need to push whole tuple into heap, which increases too much 
overhead
+    # push only sort_key and line into heap
+    heap: list[tuple[int, StructuredLogMessage]] = []
+    # to allow removing empty streams while iterating, also turn the str 
stream into parsed log stream
+    parsed_log_streams: dict[int, ParsedLogStream] = {
+        idx: _log_stream_to_parsed_log_stream(log_stream) for idx, log_stream 
in enumerate(log_streams)
+    }
+
+    # keep adding records from logs until all logs are empty
     last = None
-    for timestamp, _, msg in sorted(records, key=lambda x: (x[0] or min_date, 
x[1])):
-        if msg != last or not timestamp:  # dedupe
-            yield msg
-        last = msg
+    while parsed_log_streams:
+        _add_log_from_parsed_log_streams_to_heap(heap, parsed_log_streams)
+
+        # yield HALF_HEAP_DUMP_SIZE records when heap size exceeds 
HEAP_DUMP_SIZE
+        if len(heap) >= HEAP_DUMP_SIZE:
+            for _ in range(HALF_HEAP_DUMP_SIZE):
+                sort_key, line = heapq.heappop(heap)
+                if line != last or 
_is_sort_key_with_default_timestamp(sort_key):  # dedupe
+                    yield line
+                last = line
+
+    # yield remaining records
+    for _ in range(len(heap)):
+        sort_key, line = heapq.heappop(heap)
+        if line != last or _is_sort_key_with_default_timestamp(sort_key):  # 
dedupe
+            yield line
+        last = line
+    # free memory
+    del heap
+    del parsed_log_streams
+
+
+def _is_logs_stream_like(log):
+    """Check if the logs are stream-like."""
+    return isinstance(log, chain) or isinstance(log, GeneratorType)

Review Comment:
   ```suggestion
       return isinstance(log, (chain, GeneratorType))
   ```



##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -166,17 +242,133 @@ def _parse_log_lines(
                 log.timestamp = coerce_datetime(log.timestamp)
                 timestamp = log.timestamp
             yield timestamp, idx, log
+        idx += 1
 
 
-def _interleave_logs(*logs: str | LogMessages) -> 
Iterable[StructuredLogMessage]:
-    min_date = pendulum.datetime(2000, 1, 1)
+def _sort_key(timestamp: datetime | None, line_num: int) -> int:
+    """
+    Generate a sort key for log record, to be used in K-way merge.
 
-    records = itertools.chain.from_iterable(_parse_log_lines(log) for log in 
logs)
+    :param timestamp: timestamp of the log line
+    :param line_num: line number of the log line
+    :return: a integer as sort key to avoid overhead of memory usage
+    """
+    return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) * 
SORT_KEY_OFFSET + line_num
+
+
+def _is_sort_key_with_default_timestamp(sort_key: int) -> bool:
+    """
+    Check if the sort key was generated with the DEFAULT_SORT_TIMESTAMP.
+
+    This is used to identify log records that don't have timestamp.
+
+    :param sort_key: The sort key to check
+    :return: True if the sort key was generated with DEFAULT_SORT_TIMESTAMP, 
False otherwise
+    """
+    # Extract the timestamp part from the sort key (remove the line number 
part)
+    timestamp_part = sort_key // SORT_KEY_OFFSET
+    return timestamp_part == DEFAULT_SORT_TIMESTAMP
+
+
+def _add_log_from_parsed_log_streams_to_heap(
+    heap: list[tuple[int, StructuredLogMessage]],
+    parsed_log_streams: dict[int, ParsedLogStream],
+) -> None:
+    """
+    Add one log record from each parsed log stream to the heap, and will 
remove empty log stream from the dict after iterating.
+
+    :param heap: heap to store log records
+    :param parsed_log_streams: dict of parsed log streams
+    """
+    log_stream_to_remove: list[int] | None = None
+    for idx, log_stream in parsed_log_streams.items():
+        record: ParsedLog | None = next(log_stream, None)
+        if record is None:
+            if log_stream_to_remove is None:
+                log_stream_to_remove = []
+            log_stream_to_remove.append(idx)
+            continue
+        # add type hint to avoid mypy error
+        record = cast("ParsedLog", record)
+        timestamp, line_num, line = record
+        # take int as sort key to avoid overhead of memory usage
+        heapq.heappush(heap, (_sort_key(timestamp, line_num), line))
+    # remove empty log stream from the dict
+    if log_stream_to_remove is not None:
+        for idx in log_stream_to_remove:
+            del parsed_log_streams[idx]
+
+
+def _interleave_logs(*log_streams: RawLogStream) -> StructuredLogStream:
+    """
+    Merge parsed log streams using K-way merge.
+
+    By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we 
can reduce the chance of messing up the global order.
+    Since there are multiple log streams, we can't guarantee that the records 
are in global order.
+
+    e.g.
+
+    log_stream1: ----------
+    log_stream2:   ----
+    log_stream3:     --------
+
+    The first record of log_stream3 is later than the fourth record of 
log_stream1 !
+    :param parsed_log_streams: parsed log streams
+    :return: interleaved log stream
+    """
+    # don't need to push whole tuple into heap, which increases too much 
overhead
+    # push only sort_key and line into heap
+    heap: list[tuple[int, StructuredLogMessage]] = []
+    # to allow removing empty streams while iterating, also turn the str 
stream into parsed log stream
+    parsed_log_streams: dict[int, ParsedLogStream] = {
+        idx: _log_stream_to_parsed_log_stream(log_stream) for idx, log_stream 
in enumerate(log_streams)
+    }
+
+    # keep adding records from logs until all logs are empty
     last = None
-    for timestamp, _, msg in sorted(records, key=lambda x: (x[0] or min_date, 
x[1])):
-        if msg != last or not timestamp:  # dedupe
-            yield msg
-        last = msg
+    while parsed_log_streams:
+        _add_log_from_parsed_log_streams_to_heap(heap, parsed_log_streams)
+
+        # yield HALF_HEAP_DUMP_SIZE records when heap size exceeds 
HEAP_DUMP_SIZE
+        if len(heap) >= HEAP_DUMP_SIZE:
+            for _ in range(HALF_HEAP_DUMP_SIZE):
+                sort_key, line = heapq.heappop(heap)
+                if line != last or 
_is_sort_key_with_default_timestamp(sort_key):  # dedupe
+                    yield line
+                last = line
+
+    # yield remaining records
+    for _ in range(len(heap)):
+        sort_key, line = heapq.heappop(heap)
+        if line != last or _is_sort_key_with_default_timestamp(sort_key):  # 
dedupe
+            yield line
+        last = line
+    # free memory
+    del heap
+    del parsed_log_streams
+
+
+def _is_logs_stream_like(log):
+    """Check if the logs are stream-like."""
+    return isinstance(log, chain) or isinstance(log, GeneratorType)
+
+
+def _get_compatible_log_stream(
+    log_messages: LogMessages,
+) -> RawLogStream:
+    """
+    Convert legacy log message blobs into a generator that yields log lines.
+
+    :param log_messages: List of legacy log message strings.
+    :return: A generator that yields interleaved log lines.
+    """
+    log_streams: list[RawLogStream] = []
+    for log_message in log_messages:
+        # Append the log stream to the list
+        log_streams.append(_stream_lines_by_chunk(io.StringIO(log_message)))

Review Comment:
   ```suggestion
       log_streams: list[RawLogStream] = [
           _stream_lines_by_chunk(io.StringIO(log_message))
           for log_message in log_messages
       ]
   ```



##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -36,24 +39,66 @@
 from airflow.configuration import conf
 from airflow.executors.executor_loader import ExecutorLoader
 from airflow.utils.helpers import parse_template_string, render_template
+from airflow.utils.log.log_stream_accumulator import LogStreamAccumulator
 from airflow.utils.log.logging_mixin import SetContextPropagate
 from airflow.utils.log.non_caching_file_handler import 
NonCachingRotatingFileHandler
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.state import State, TaskInstanceState
 
 if TYPE_CHECKING:
+    from requests import Response
+
     from airflow.executors.base_executor import BaseExecutor
     from airflow.models.taskinstance import TaskInstance
     from airflow.models.taskinstancehistory import TaskInstanceHistory
     from airflow.typing_compat import TypeAlias
 
+CHUNK_SIZE = 1024 * 1024 * 5  # 5MB
+DEFAULT_SORT_DATETIME = pendulum.datetime(2000, 1, 1)
+DEFAULT_SORT_TIMESTAMP = int(DEFAULT_SORT_DATETIME.timestamp() * 1000)
+SORT_KEY_OFFSET = 10000000
+"""An offset used by the _sort_key utility.
+
+Assuming 50 characters per line, an offset of 10,000,000 can represent 
approximately 500 MB of file data, which is sufficient for use as a constant.
+"""
+HEAP_DUMP_SIZE = 5000
+HALF_HEAP_DUMP_SIZE = HEAP_DUMP_SIZE // 2
 
 # These types are similar, but have distinct names to make processing them 
less error prone
-LogMessages: TypeAlias = Union[list["StructuredLogMessage"], list[str]]
-"""The log messages themselves, either in already sturcutured form, or a 
single string blob to be parsed later"""
+LogMessages: TypeAlias = list[str]
+"""The legacy format of log messages, represented as a single string blob to 
be parsed later."""
 LogSourceInfo: TypeAlias = list[str]
 """Information _about_ the log fetching process for display to a user"""
 LogMetadata: TypeAlias = dict[str, Any]

Review Comment:
   not sure whehter TypeDict would be better



##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -166,17 +242,133 @@ def _parse_log_lines(
                 log.timestamp = coerce_datetime(log.timestamp)
                 timestamp = log.timestamp
             yield timestamp, idx, log
+        idx += 1
 
 
-def _interleave_logs(*logs: str | LogMessages) -> 
Iterable[StructuredLogMessage]:
-    min_date = pendulum.datetime(2000, 1, 1)
+def _sort_key(timestamp: datetime | None, line_num: int) -> int:

Review Comment:
   ```suggestion
   def _create_sort_key(timestamp: datetime | None, line_num: int) -> int:
   ```
   
   not sure whether it's better 🤔 it kinda confused me when I saw `sort_key` is 
a none and `_sort_key` looks like a verb that returns `sort_key`



##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -36,24 +39,66 @@
 from airflow.configuration import conf
 from airflow.executors.executor_loader import ExecutorLoader
 from airflow.utils.helpers import parse_template_string, render_template
+from airflow.utils.log.log_stream_accumulator import LogStreamAccumulator
 from airflow.utils.log.logging_mixin import SetContextPropagate
 from airflow.utils.log.non_caching_file_handler import 
NonCachingRotatingFileHandler
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.state import State, TaskInstanceState
 
 if TYPE_CHECKING:
+    from requests import Response
+
     from airflow.executors.base_executor import BaseExecutor
     from airflow.models.taskinstance import TaskInstance
     from airflow.models.taskinstancehistory import TaskInstanceHistory
     from airflow.typing_compat import TypeAlias
 
+CHUNK_SIZE = 1024 * 1024 * 5  # 5MB
+DEFAULT_SORT_DATETIME = pendulum.datetime(2000, 1, 1)
+DEFAULT_SORT_TIMESTAMP = int(DEFAULT_SORT_DATETIME.timestamp() * 1000)
+SORT_KEY_OFFSET = 10000000
+"""An offset used by the _sort_key utility.
+
+Assuming 50 characters per line, an offset of 10,000,000 can represent 
approximately 500 MB of file data, which is sufficient for use as a constant.
+"""
+HEAP_DUMP_SIZE = 5000
+HALF_HEAP_DUMP_SIZE = HEAP_DUMP_SIZE // 2
 
 # These types are similar, but have distinct names to make processing them 
less error prone
-LogMessages: TypeAlias = Union[list["StructuredLogMessage"], list[str]]
-"""The log messages themselves, either in already sturcutured form, or a 
single string blob to be parsed later"""
+LogMessages: TypeAlias = list[str]
+"""The legacy format of log messages, represented as a single string blob to 
be parsed later."""
 LogSourceInfo: TypeAlias = list[str]
 """Information _about_ the log fetching process for display to a user"""
 LogMetadata: TypeAlias = dict[str, Any]
+"""Metadata about the log fetching process, including `end_of_log` and 
`log_pos`.
+
+- `end_of_log`: Boolean. Indicates if the log has ended.
+- `log_pos`: Integer. The absolute character position up to which the log was 
retrieved across all sources.
+"""
+RawLogStream: TypeAlias = Generator[str, None, None]
+"""Raw log stream, containing unparsed log lines."""
+LegacyLogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages]

Review Comment:
   By `Legacy` do you mean airflow 2 or before this PR



##########
airflow-core/src/airflow/utils/log/log_stream_accumulator.py:
##########
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import os
+import tempfile
+from itertools import islice
+from typing import IO, TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from airflow.typing_compat import Self
+    from airflow.utils.log.file_task_handler import (
+        LogHandlerOutputStream,
+        StructuredLogMessage,
+        StructuredLogStream,
+    )
+
+
+class LogStreamAccumulator:
+    """
+    Memory-efficient log stream accumulator that tracks the total number of 
lines while preserving the original stream.
+
+    This class captures logs from a stream and stores them in a buffer, 
flushing them to disk when the buffer
+    exceeds a specified threshold. This approach optimizes memory usage while 
handling large log streams.
+
+    Usage:
+
+    .. code-block:: python
+
+        with LogStreamAccumulator(stream, threshold) as log_accumulator:
+            # Get total number of lines captured
+            total_lines = log_accumulator.get_total_lines()
+
+            # Retrieve the original stream of logs
+            for log in log_accumulator.get_stream():
+                print(log)
+    """
+
+    def __init__(
+        self,
+        stream: LogHandlerOutputStream,
+        threshold: int,
+    ) -> None:
+        """
+        Initialize the LogStreamAccumulator.
+
+        Args:
+            stream: The input log stream to capture and count.
+            threshold: Maximum number of lines to keep in memory before 
flushing to disk.
+        """
+        self._stream = stream
+        self._threshold = threshold
+        self._buffer: list[StructuredLogMessage] = []
+        self._disk_lines: int = 0
+        self._tmpfile: IO[str] | None = None
+
+    def _flush_buffer_to_disk(self) -> None:
+        """Flush the buffer contents to a temporary file on disk."""
+        if self._tmpfile is None:
+            self._tmpfile = tempfile.NamedTemporaryFile(delete=False, 
mode="w+", encoding="utf-8")
+
+        self._disk_lines += len(self._buffer)
+        self._tmpfile.writelines(f"{log.model_dump_json()}\n" for log in 
self._buffer)
+        self._tmpfile.flush()
+        self._buffer.clear()
+
+    def _capture(self) -> None:
+        """Capture logs from the stream into the buffer, flushing to disk when 
threshold is reached."""
+        while True:
+            # `islice` will try to get up to `self._threshold` lines from the 
stream.
+            self._buffer.extend(islice(self._stream, self._threshold))
+            # If the buffer has reached the threshold, flush it to disk.
+            if len(self._buffer) >= self._threshold:
+                self._flush_buffer_to_disk()
+            else:  # If there are no more lines to capture, exit the loop.
+                break
+
+    def _cleanup(self) -> None:
+        """Clean up the temporary file if it exists."""
+        self._buffer.clear()
+        if self._tmpfile:
+            self._tmpfile.close()
+            os.remove(self._tmpfile.name)
+            self._tmpfile = None
+
+    def get_total_lines(self) -> int:
+        """
+        Return the total number of lines captured from the stream.
+
+        Returns:
+            The sum of lines stored in the buffer and lines written to disk.
+        """
+        return self._disk_lines + len(self._buffer)
+
+    def get_stream(self) -> StructuredLogStream:

Review Comment:
   not sure whether making it a property makes sense.



##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -370,22 +562,38 @@ def _read(
         # initializing the handler. Thus explicitly getting log location
         # is needed to get correct log path.
         worker_log_rel_path = self._render_filename(ti, try_number)
+        sources: LogSourceInfo = []
         source_list: list[str] = []
-        remote_logs: LogMessages | None = []
-        local_logs: list[str] = []
-        sources: list[str] = []
-        executor_logs: list[str] = []
-        served_logs: LogMessages = []
+        remote_logs: list[RawLogStream] = []
+        local_logs: list[RawLogStream] = []
+        executor_logs: list[RawLogStream] = []
+        served_logs: list[RawLogStream] = []
         with suppress(NotImplementedError):
-            sources, remote_logs = self._read_remote_logs(ti, try_number, 
metadata)
-
+            sources, logs = self._read_remote_logs(ti, try_number, metadata)
+            if not len(logs):

Review Comment:
   ```suggestion
               if not logs:
   ```



##########
airflow-core/src/airflow/utils/serve_logs.py:
##########
@@ -135,7 +135,7 @@ def validate_pre_signed_url():
 
     @flask_app.route("/log/<path:filename>")
     def serve_logs_view(filename):
-        return send_from_directory(log_directory, filename, 
mimetype="application/json", as_attachment=False)
+        return send_from_directory(log_directory, filename, 
mimetype="text/plain", as_attachment=False)

Review Comment:
   may i know why did we change this?



##########
airflow-core/src/airflow/utils/log/log_stream_accumulator.py:
##########
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import os
+import tempfile
+from itertools import islice
+from typing import IO, TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from airflow.typing_compat import Self
+    from airflow.utils.log.file_task_handler import (
+        LogHandlerOutputStream,
+        StructuredLogMessage,
+        StructuredLogStream,
+    )
+
+
+class LogStreamAccumulator:
+    """
+    Memory-efficient log stream accumulator that tracks the total number of 
lines while preserving the original stream.
+
+    This class captures logs from a stream and stores them in a buffer, 
flushing them to disk when the buffer
+    exceeds a specified threshold. This approach optimizes memory usage while 
handling large log streams.
+
+    Usage:
+
+    .. code-block:: python
+
+        with LogStreamAccumulator(stream, threshold) as log_accumulator:
+            # Get total number of lines captured
+            total_lines = log_accumulator.get_total_lines()
+
+            # Retrieve the original stream of logs
+            for log in log_accumulator.get_stream():
+                print(log)
+    """
+
+    def __init__(
+        self,
+        stream: LogHandlerOutputStream,
+        threshold: int,
+    ) -> None:
+        """
+        Initialize the LogStreamAccumulator.
+
+        Args:
+            stream: The input log stream to capture and count.
+            threshold: Maximum number of lines to keep in memory before 
flushing to disk.
+        """
+        self._stream = stream
+        self._threshold = threshold
+        self._buffer: list[StructuredLogMessage] = []
+        self._disk_lines: int = 0
+        self._tmpfile: IO[str] | None = None
+
+    def _flush_buffer_to_disk(self) -> None:
+        """Flush the buffer contents to a temporary file on disk."""
+        if self._tmpfile is None:
+            self._tmpfile = tempfile.NamedTemporaryFile(delete=False, 
mode="w+", encoding="utf-8")
+
+        self._disk_lines += len(self._buffer)
+        self._tmpfile.writelines(f"{log.model_dump_json()}\n" for log in 
self._buffer)
+        self._tmpfile.flush()
+        self._buffer.clear()
+
+    def _capture(self) -> None:
+        """Capture logs from the stream into the buffer, flushing to disk when 
threshold is reached."""
+        while True:
+            # `islice` will try to get up to `self._threshold` lines from the 
stream.
+            self._buffer.extend(islice(self._stream, self._threshold))
+            # If the buffer has reached the threshold, flush it to disk.
+            if len(self._buffer) >= self._threshold:
+                self._flush_buffer_to_disk()
+            else:  # If there are no more lines to capture, exit the loop.
+                break
+
+    def _cleanup(self) -> None:
+        """Clean up the temporary file if it exists."""
+        self._buffer.clear()
+        if self._tmpfile:
+            self._tmpfile.close()
+            os.remove(self._tmpfile.name)
+            self._tmpfile = None
+
+    def get_total_lines(self) -> int:

Review Comment:
   not sure whether making it a property makes sense.



##########
airflow-core/src/airflow/utils/log/log_reader.py:
##########
@@ -72,7 +73,7 @@ def read_log_chunks(
         contain information about the task log which can enable you read logs 
to the
         end.
         """
-        return self.log_handler.read(ti, try_number, metadata=metadata)
+        return self.log_handler.read(ti, try_number, metadata)

Review Comment:
   why this change?



##########
airflow-core/src/airflow/utils/log/log_stream_accumulator.py:
##########
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import os
+import tempfile
+from itertools import islice
+from typing import IO, TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from airflow.typing_compat import Self
+    from airflow.utils.log.file_task_handler import (
+        LogHandlerOutputStream,
+        StructuredLogMessage,
+        StructuredLogStream,
+    )
+
+
+class LogStreamAccumulator:
+    """
+    Memory-efficient log stream accumulator that tracks the total number of 
lines while preserving the original stream.
+
+    This class captures logs from a stream and stores them in a buffer, 
flushing them to disk when the buffer
+    exceeds a specified threshold. This approach optimizes memory usage while 
handling large log streams.
+
+    Usage:
+
+    .. code-block:: python
+
+        with LogStreamAccumulator(stream, threshold) as log_accumulator:
+            # Get total number of lines captured
+            total_lines = log_accumulator.get_total_lines()
+
+            # Retrieve the original stream of logs
+            for log in log_accumulator.get_stream():
+                print(log)
+    """
+
+    def __init__(
+        self,
+        stream: LogHandlerOutputStream,
+        threshold: int,
+    ) -> None:
+        """
+        Initialize the LogStreamAccumulator.
+
+        Args:
+            stream: The input log stream to capture and count.
+            threshold: Maximum number of lines to keep in memory before 
flushing to disk.
+        """
+        self._stream = stream
+        self._threshold = threshold
+        self._buffer: list[StructuredLogMessage] = []
+        self._disk_lines: int = 0
+        self._tmpfile: IO[str] | None = None
+
+    def _flush_buffer_to_disk(self) -> None:
+        """Flush the buffer contents to a temporary file on disk."""
+        if self._tmpfile is None:
+            self._tmpfile = tempfile.NamedTemporaryFile(delete=False, 
mode="w+", encoding="utf-8")
+
+        self._disk_lines += len(self._buffer)
+        self._tmpfile.writelines(f"{log.model_dump_json()}\n" for log in 
self._buffer)
+        self._tmpfile.flush()
+        self._buffer.clear()
+
+    def _capture(self) -> None:
+        """Capture logs from the stream into the buffer, flushing to disk when 
threshold is reached."""
+        while True:
+            # `islice` will try to get up to `self._threshold` lines from the 
stream.
+            self._buffer.extend(islice(self._stream, self._threshold))
+            # If the buffer has reached the threshold, flush it to disk.
+            if len(self._buffer) >= self._threshold:
+                self._flush_buffer_to_disk()
+            else:  # If there are no more lines to capture, exit the loop.
+                break

Review Comment:
   ```suggestion
               # If there are no more lines to capture, exit the loop.
               if len(self._buffer) < self._threshold:
                   break
               self._flush_buffer_to_disk()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to