This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 63ba16388dd Add log buffering to increase performance when streaming 
big logs (#63180)
63ba16388dd is described below

commit 63ba16388dd7a60fcda4296df66dfb6a2843eef1
Author: Software Developer <[email protected]>
AuthorDate: Mon Mar 30 14:06:45 2026 +0200

    Add log buffering to increase performance when streaming big logs (#63180)
    
    * *add buffering back to speed logs loading.
    
    * *fix scrolling for big logs.
    
    * *return back `# type: ignore[arg-type]`. apply PR remarks.
    
    * *add buffering back to speed logs loading.
    
    * *fix scrolling for big logs.
    
    * *return back `# type: ignore[arg-type]`. apply PR remarks.
    
    * *make buffer size configurable.
    
    * *add back some comments.
---
 .../api_fastapi/core_api/routes/public/log.py      | 20 ++++++++-
 .../src/airflow/config_templates/config.yml        |  8 ++++
 .../src/pages/TaskInstance/Logs/TaskLogContent.tsx | 51 +++++++++++++++++++++-
 3 files changed, 77 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py
index 20379876ebe..6b41aa3f5ac 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 
 import contextlib
 import textwrap
+from collections.abc import Generator, Iterable
 
 from fastapi import Depends, HTTPException, Request, status
 from fastapi.responses import StreamingResponse
@@ -35,11 +36,14 @@ from airflow.api_fastapi.common.types import Mimetype
 from airflow.api_fastapi.core_api.datamodels.log import 
ExternalLogUrlResponse, TaskInstancesLogResponse
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
 from airflow.api_fastapi.core_api.security import DagAccessEntity, 
requires_access_dag
+from airflow.configuration import conf
 from airflow.exceptions import TaskNotFound
 from airflow.models import TaskInstance, Trigger
 from airflow.models.taskinstancehistory import TaskInstanceHistory
 from airflow.utils.log.log_reader import TaskLogReader
 
+_NDJSON_BATCH_SIZE = conf.getint("api", "log_stream_buffer_size")
+
 task_instances_log_router = AirflowRouter(
     tags=["Task Instance"], 
prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances"
 )
@@ -59,6 +63,19 @@ ndjson_example_response_for_get_log = {
 }
 
 
+def _buffered_ndjson_stream(
+    raw_stream: Iterable[str],
+) -> Generator[str, None, None]:
+    buf: list[str] = []
+    for line in raw_stream:
+        buf.append(line)
+        if len(buf) >= _NDJSON_BATCH_SIZE:
+            yield "".join(buf)
+            buf.clear()
+    if buf:
+        yield "".join(buf)
+
+
 @task_instances_log_router.get(
     "/{task_id}/logs/{try_number}",
     responses={
@@ -146,7 +163,8 @@ def get_log(
 
     if accept == Mimetype.NDJSON:  # only specified application/x-ndjson will 
return streaming response
         # LogMetadata(TypedDict) is used as type annotation for log_reader; 
added ignore to suppress mypy error
-        log_stream = task_log_reader.read_log_stream(ti, try_number, metadata) 
 # type: ignore[arg-type]
+        raw_stream = task_log_reader.read_log_stream(ti, try_number, metadata) 
 # type: ignore[arg-type]
+        log_stream = _buffered_ndjson_stream(raw_stream)
         headers = None
         if not metadata.get("end_of_log", False):
             headers = {
diff --git a/airflow-core/src/airflow/config_templates/config.yml 
b/airflow-core/src/airflow/config_templates/config.yml
index 55313198d5b..2f1c63a21c1 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -1700,6 +1700,14 @@ api:
       type: string
       example: path/to/logging_config.yaml
       default: ~
+    log_stream_buffer_size:
+      description: |
+        Number of log lines to buffer before flushing to the client when 
streaming task logs
+        in NDJSON format. Larger values reduce HTTP overhead but increase 
time-to-first-byte.
+      version_added: 3.2.0
+      type: integer
+      example: ~
+      default: "500"
     ssl_cert:
       description: |
         Paths to the SSL certificate and key for the api server. When both are
diff --git 
a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/TaskLogContent.tsx 
b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/TaskLogContent.tsx
index f6df9e1c420..59ba801bacc 100644
--- a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/TaskLogContent.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/TaskLogContent.tsx
@@ -18,7 +18,7 @@
  */
 import { Box, Code, VStack, IconButton } from "@chakra-ui/react";
 import { useVirtualizer } from "@tanstack/react-virtual";
-import { type JSX, useLayoutEffect, useRef } from "react";
+import { type JSX, useLayoutEffect, useRef, useCallback, useEffect } from 
"react";
 import { useHotkeys } from "react-hotkeys-hook";
 import { useTranslation } from "react-i18next";
 import { FiChevronDown, FiChevronUp } from "react-icons/fi";
@@ -37,6 +37,9 @@ export type TaskLogContentProps = {
   readonly wrap: boolean;
 };
 
+// How close to the bottom (in px) before we consider the user "at the bottom"
+const SCROLL_BOTTOM_THRESHOLD = 100;
+
 const ScrollToButton = ({
   direction,
   onClick,
@@ -83,6 +86,12 @@ export const TaskLogContent = ({ error, isLoading, logError, 
parsedLogs, wrap }:
   const hash = location.hash.replace("#", "");
   const parentRef = useRef<HTMLDivElement | null>(null);
 
+  // Track whether user is at the bottom so we don't hijack their scroll 
position
+  // if they scrolled up to read something
+  const isAtBottomRef = useRef<boolean>(true);
+  // Track previous log count to detect new lines arriving
+  const prevLogCountRef = useRef<number>(0);
+
   const rowVirtualizer = useVirtualizer({
     count: parsedLogs.length,
     estimateSize: () => 20,
@@ -94,6 +103,44 @@ export const TaskLogContent = ({ error, isLoading, 
logError, parsedLogs, wrap }:
   const containerHeight = rowVirtualizer.scrollElement?.clientHeight ?? 0;
   const showScrollButtons = parsedLogs.length > 1 && contentHeight > 
containerHeight;
 
+  // Check if user is near the bottom on scroll
+  const handleScroll = useCallback(() => {
+    const el = parentRef.current;
+
+    if (!el) {
+      return;
+    }
+    const distanceFromBottom = el.scrollHeight - el.scrollTop - 
el.clientHeight;
+
+    isAtBottomRef.current = distanceFromBottom <= SCROLL_BOTTOM_THRESHOLD;
+  }, []);
+
+  useEffect(() => {
+    const el = parentRef.current;
+
+    el?.addEventListener("scroll", handleScroll, { passive: true });
+
+    return () => el?.removeEventListener("scroll", handleScroll);
+  }, [handleScroll]);
+
+  // Auto-scroll to bottom when:
+  // 1. Logs first load (prevLogCount was 0)
+  // 2. New lines arrive AND user was already at the bottom
+  useLayoutEffect(() => {
+    if (parsedLogs.length === 0) {
+      return;
+    }
+
+    const isFirstLoad = prevLogCountRef.current === 0;
+    const hasNewLines = parsedLogs.length > prevLogCountRef.current;
+
+    if ((isFirstLoad || (hasNewLines && isAtBottomRef.current)) && 
!location.hash) {
+      rowVirtualizer.scrollToIndex(parsedLogs.length - 1, { align: "end" });
+    }
+
+    prevLogCountRef.current = parsedLogs.length;
+  }, [parsedLogs.length, rowVirtualizer]);
+
   useLayoutEffect(() => {
     if (location.hash && !isLoading) {
       rowVirtualizer.scrollToIndex(Math.min(Number(hash) + 5, 
parsedLogs.length - 1));
@@ -112,8 +159,10 @@ export const TaskLogContent = ({ error, isLoading, 
logError, parsedLogs, wrap }:
     }
 
     if (to === "top") {
+      isAtBottomRef.current = false;
       scrollToTop({ element: el, virtualizer: rowVirtualizer });
     } else {
+      isAtBottomRef.current = true;
       scrollToBottom({ element: el, virtualizer: rowVirtualizer });
     }
   };

Reply via email to