This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 63ba16388dd Add log buffering to increase performance when streaming
big logs (#63180)
63ba16388dd is described below
commit 63ba16388dd7a60fcda4296df66dfb6a2843eef1
Author: Software Developer <[email protected]>
AuthorDate: Mon Mar 30 14:06:45 2026 +0200
Add log buffering to increase performance when streaming big logs (#63180)
* *add buffering back to speed logs loading.
* *fix scrolling for big logs.
* *return back `# type: ignore[arg-type]`. apply PR remarks.
* *add buffering back to speed logs loading.
* *fix scrolling for big logs.
* *return back `# type: ignore[arg-type]`. apply PR remarks.
* *make buffer size configurable.
* *add back some comments.
---
.../api_fastapi/core_api/routes/public/log.py | 20 ++++++++-
.../src/airflow/config_templates/config.yml | 8 ++++
.../src/pages/TaskInstance/Logs/TaskLogContent.tsx | 51 +++++++++++++++++++++-
3 files changed, 77 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py
index 20379876ebe..6b41aa3f5ac 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py
@@ -19,6 +19,7 @@ from __future__ import annotations
import contextlib
import textwrap
+from collections.abc import Generator, Iterable
from fastapi import Depends, HTTPException, Request, status
from fastapi.responses import StreamingResponse
@@ -35,11 +36,14 @@ from airflow.api_fastapi.common.types import Mimetype
from airflow.api_fastapi.core_api.datamodels.log import
ExternalLogUrlResponse, TaskInstancesLogResponse
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import DagAccessEntity,
requires_access_dag
+from airflow.configuration import conf
from airflow.exceptions import TaskNotFound
from airflow.models import TaskInstance, Trigger
from airflow.models.taskinstancehistory import TaskInstanceHistory
from airflow.utils.log.log_reader import TaskLogReader
+_NDJSON_BATCH_SIZE = conf.getint("api", "log_stream_buffer_size")
+
task_instances_log_router = AirflowRouter(
tags=["Task Instance"],
prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances"
)
@@ -59,6 +63,19 @@ ndjson_example_response_for_get_log = {
}
+def _buffered_ndjson_stream(
+ raw_stream: Iterable[str],
+) -> Generator[str, None, None]:
+ buf: list[str] = []
+ for line in raw_stream:
+ buf.append(line)
+ if len(buf) >= _NDJSON_BATCH_SIZE:
+ yield "".join(buf)
+ buf.clear()
+ if buf:
+ yield "".join(buf)
+
+
@task_instances_log_router.get(
"/{task_id}/logs/{try_number}",
responses={
@@ -146,7 +163,8 @@ def get_log(
if accept == Mimetype.NDJSON: # only specified application/x-ndjson will
return streaming response
# LogMetadata(TypedDict) is used as type annotation for log_reader;
added ignore to suppress mypy error
- log_stream = task_log_reader.read_log_stream(ti, try_number, metadata)
# type: ignore[arg-type]
+ raw_stream = task_log_reader.read_log_stream(ti, try_number, metadata)
# type: ignore[arg-type]
+ log_stream = _buffered_ndjson_stream(raw_stream)
headers = None
if not metadata.get("end_of_log", False):
headers = {
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index 55313198d5b..2f1c63a21c1 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -1700,6 +1700,14 @@ api:
type: string
example: path/to/logging_config.yaml
default: ~
+ log_stream_buffer_size:
+ description: |
+ Number of log lines to buffer before flushing to the client when
streaming task logs
+ in NDJSON format. Larger values reduce HTTP overhead but increase
time-to-first-byte.
+ version_added: 3.2.0
+ type: integer
+ example: ~
+ default: "500"
ssl_cert:
description: |
Paths to the SSL certificate and key for the api server. When both are
diff --git
a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/TaskLogContent.tsx
b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/TaskLogContent.tsx
index f6df9e1c420..59ba801bacc 100644
--- a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/TaskLogContent.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/TaskLogContent.tsx
@@ -18,7 +18,7 @@
*/
import { Box, Code, VStack, IconButton } from "@chakra-ui/react";
import { useVirtualizer } from "@tanstack/react-virtual";
-import { type JSX, useLayoutEffect, useRef } from "react";
+import { type JSX, useLayoutEffect, useRef, useCallback, useEffect } from
"react";
import { useHotkeys } from "react-hotkeys-hook";
import { useTranslation } from "react-i18next";
import { FiChevronDown, FiChevronUp } from "react-icons/fi";
@@ -37,6 +37,9 @@ export type TaskLogContentProps = {
readonly wrap: boolean;
};
+// How close to the bottom (in px) before we consider the user "at the bottom"
+const SCROLL_BOTTOM_THRESHOLD = 100;
+
const ScrollToButton = ({
direction,
onClick,
@@ -83,6 +86,12 @@ export const TaskLogContent = ({ error, isLoading, logError,
parsedLogs, wrap }:
const hash = location.hash.replace("#", "");
const parentRef = useRef<HTMLDivElement | null>(null);
+ // Track whether user is at the bottom so we don't hijack their scroll
position
+ // if they scrolled up to read something
+ const isAtBottomRef = useRef<boolean>(true);
+ // Track previous log count to detect new lines arriving
+ const prevLogCountRef = useRef<number>(0);
+
const rowVirtualizer = useVirtualizer({
count: parsedLogs.length,
estimateSize: () => 20,
@@ -94,6 +103,44 @@ export const TaskLogContent = ({ error, isLoading,
logError, parsedLogs, wrap }:
const containerHeight = rowVirtualizer.scrollElement?.clientHeight ?? 0;
const showScrollButtons = parsedLogs.length > 1 && contentHeight >
containerHeight;
+ // Check if user is near the bottom on scroll
+ const handleScroll = useCallback(() => {
+ const el = parentRef.current;
+
+ if (!el) {
+ return;
+ }
+ const distanceFromBottom = el.scrollHeight - el.scrollTop -
el.clientHeight;
+
+ isAtBottomRef.current = distanceFromBottom <= SCROLL_BOTTOM_THRESHOLD;
+ }, []);
+
+ useEffect(() => {
+ const el = parentRef.current;
+
+ el?.addEventListener("scroll", handleScroll, { passive: true });
+
+ return () => el?.removeEventListener("scroll", handleScroll);
+ }, [handleScroll]);
+
+ // Auto-scroll to bottom when:
+ // 1. Logs first load (prevLogCount was 0)
+ // 2. New lines arrive AND user was already at the bottom
+ useLayoutEffect(() => {
+ if (parsedLogs.length === 0) {
+ return;
+ }
+
+ const isFirstLoad = prevLogCountRef.current === 0;
+ const hasNewLines = parsedLogs.length > prevLogCountRef.current;
+
+ if ((isFirstLoad || (hasNewLines && isAtBottomRef.current)) &&
!location.hash) {
+ rowVirtualizer.scrollToIndex(parsedLogs.length - 1, { align: "end" });
+ }
+
+ prevLogCountRef.current = parsedLogs.length;
+ }, [parsedLogs.length, rowVirtualizer]);
+
useLayoutEffect(() => {
if (location.hash && !isLoading) {
rowVirtualizer.scrollToIndex(Math.min(Number(hash) + 5,
parsedLogs.length - 1));
@@ -112,8 +159,10 @@ export const TaskLogContent = ({ error, isLoading,
logError, parsedLogs, wrap }:
}
if (to === "top") {
+ isAtBottomRef.current = false;
scrollToTop({ element: el, virtualizer: rowVirtualizer });
} else {
+ isAtBottomRef.current = true;
scrollToBottom({ element: el, virtualizer: rowVirtualizer });
}
};