This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0266b0b14eb Add option to have structured JSON logging for _all_ API 
server output (#63365)
0266b0b14eb is described below

commit 0266b0b14eb6312382ae6af7b033084bc83bb9ff
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Thu Mar 12 19:56:16 2026 +0000

    Add option to have structured JSON logging for _all_ API server output 
(#63365)
    
    Previously gunicorn/uvicorn each installed their own log handlers and
    formatters, bypassing Airflow's structlog ProcessorFormatter entirely. This
    meant:
    
    - Gunicorn set up its own StreamHandler on gunicorn.error / gunicorn.access,
      so those records never went through structlog.
    - Uvicorn workers called logging.config.dictConfig(LOGGING_CONFIG) on 
startup,
      overwriting the structlog configuration that was applied before gunicorn
      started.
    - HTTP access log lines came from uvicorn's built-in access logger
      (unstructured).
    - Python warnings.warn() calls and unhandled exceptions bypassed structlog
      too.
    
    Now:
    
    - AirflowGunicornLogger overrides setup() to skip installing gunicorn's own
      handlers and lets records propagate to root (where structlog is 
configured).
    - AirflowUvicornWorker sets log_config=None so uvicorn doesn't clobber the
      logging config, and access_log=False since we handle that ourselves.
    - HttpAccessLogMiddleware replaces uvicorn's access log: one structured 
event
      per request with method, path, status code, duration (µs), client address,
      and the x-request-id header bound to the structlog context for the request
      lifetime. Health-check paths are excluded to avoid noise.
    - configure_logging() now explicitly silences uvicorn.access / 
gunicorn.access
      and routes uvicorn.error / gunicorn.error through the default handler.
    - Python warnings are intercepted and emitted as structured py.warnings log
      events instead of going to stderr.
    - In JSON log mode, unhandled exceptions are emitted via structlog rather 
than
      the plain-text default sys.excepthook.
---
 airflow-core/newsfragments/63365.significant.rst   |   9 ++
 .../auth/managers/simple/simple_auth_manager.py    |  12 +-
 .../airflow/api_fastapi/common/http_access_log.py  | 106 +++++++++++++
 .../src/airflow/api_fastapi/core_api/app.py        |  11 +-
 .../src/airflow/api_fastapi/gunicorn_app.py        |  49 ++++--
 .../src/airflow/cli/commands/api_server_command.py |  71 ++++-----
 .../src/airflow/cli/commands/db_command.py         |  19 +--
 .../src/airflow/cli/commands/kerberos_command.py   |   3 +-
 .../src/airflow/cli/commands/scheduler_command.py  |   3 +-
 .../src/airflow/cli/commands/triggerer_command.py  |   3 +-
 .../src/airflow/config_templates/config.yml        |  10 ++
 airflow-core/src/airflow/logging_config.py         |  18 ++-
 .../0101_3_2_0_ui_improvements_for_deadlines.py    |  74 +++++----
 airflow-core/src/airflow/utils/cli.py              |   8 +
 airflow-core/src/airflow/utils/db.py               |  12 +-
 airflow-core/src/airflow/utils/helpers.py          |  10 +-
 airflow-core/src/airflow/utils/serve_logs/core.py  |  13 +-
 .../api_fastapi/common/test_http_access_log.py     | 132 ++++++++++++++++
 .../unit/cli/commands/test_api_server_command.py   |   6 +-
 .../unit/cli/commands/test_gunicorn_monitor.py     |  33 ++--
 .../src/airflow_shared/logging/structlog.py        |  75 +++++++++-
 shared/logging/tests/logging/test_structlog.py     | 166 +++++++++++++++++++++
 task-sdk/src/airflow/sdk/log.py                    |  58 +------
 23 files changed, 696 insertions(+), 205 deletions(-)

diff --git a/airflow-core/newsfragments/63365.significant.rst 
b/airflow-core/newsfragments/63365.significant.rst
new file mode 100644
index 00000000000..494055ec525
--- /dev/null
+++ b/airflow-core/newsfragments/63365.significant.rst
@@ -0,0 +1,9 @@
+Structured JSON logging for all API server output
+
+The new ``json_logs`` option under the ``[logging]`` section makes Airflow
+produce all its output as newline-delimited JSON (structured logs) instead of
+human-readable formatted logs. This covers the API server (gunicorn/uvicorn),
+including access logs, warnings, and unhandled exceptions.
+
+Not all components support this yet — notably ``airflow celery worker`` but
+any non-JSON output when ``json_logs`` is enabled will be treated as a bug.
diff --git 
a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
 
b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
index 4d7c9850351..0f89fd20c4c 100644
--- 
a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
+++ 
b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
@@ -433,7 +433,11 @@ class 
SimpleAuthManager(BaseAuthManager[SimpleAuthManagerUser]):
 
     @staticmethod
     def _print_output(output: str):
-        name = "Simple auth manager"
-        colorized_name = colored(f"{name:10}", "white")
-        for line in output.splitlines():
-            print(f"{colorized_name} | {line.strip()}")
+        if conf.getboolean("logging", "json_logs", fallback=False):
+            for line in output.splitlines():
+                log.info(line.strip())
+        else:
+            name = "Simple auth manager"
+            colorized_name = colored(f"{name:10}", "white")
+            for line in output.splitlines():
+                print(f"{colorized_name} | {line.strip()}")
diff --git a/airflow-core/src/airflow/api_fastapi/common/http_access_log.py 
b/airflow-core/src/airflow/api_fastapi/common/http_access_log.py
new file mode 100644
index 00000000000..581c3593790
--- /dev/null
+++ b/airflow-core/src/airflow/api_fastapi/common/http_access_log.py
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""HTTP access log middleware using structlog."""
+
+from __future__ import annotations
+
+import contextlib
+import time
+from typing import TYPE_CHECKING
+
+import structlog
+
+if TYPE_CHECKING:
+    from starlette.types import ASGIApp, Message, Receive, Scope, Send
+
+logger = structlog.get_logger(logger_name="http.access")
+
+_HEALTH_PATHS = frozenset(["/api/v2/monitor/health"])
+
+
+class HttpAccessLogMiddleware:
+    """
+    Log completed HTTP requests as structured log events.
+
+    This middleware replaces uvicorn's built-in access logger. It measures the
+    full round-trip duration, binds any ``x-request-id`` header value to the
+    structlog context for the duration of the request, and emits one log event
+    per completed request.
+
+    Health-check paths are excluded to avoid log noise.
+    """
+
+    def __init__(
+        self,
+        app: ASGIApp,
+        request_id_header: str = "x-request-id",
+    ) -> None:
+        self.app = app
+        self.request_id_header = request_id_header.lower().encode("ascii")
+
+    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> 
None:
+        if scope["type"] != "http":
+            await self.app(scope, receive, send)
+            return
+
+        start = time.monotonic_ns()
+        response: Message | None = None
+
+        async def capture_send(message: Message) -> None:
+            nonlocal response
+            if message["type"] == "http.response.start":
+                response = message
+            await send(message)
+
+        request_id: str | None = None
+        for name, value in scope["headers"]:
+            if name == self.request_id_header:
+                request_id = value.decode("ascii", errors="replace")
+                break
+
+        ctx = (
+            structlog.contextvars.bound_contextvars(request_id=request_id)
+            if request_id is not None
+            else contextlib.nullcontext()
+        )
+
+        with ctx:
+            try:
+                await self.app(scope, receive, capture_send)
+            except Exception:
+                if response is None:
+                    response = {"status": 500}
+                raise
+            finally:
+                path = scope["path"]
+                if path not in _HEALTH_PATHS:
+                    duration_us = (time.monotonic_ns() - start) // 1000
+                    status = response["status"] if response is not None else 0
+                    method = scope.get("method", "")
+                    query = scope["query_string"].decode("ascii", 
errors="replace")
+                    client = scope.get("client")
+                    client_addr = f"{client[0]}:{client[1]}" if client else 
None
+
+                    logger.info(
+                        "request finished",
+                        method=method,
+                        path=path,
+                        query=query,
+                        status_code=status,
+                        duration_us=duration_us,
+                        client_addr=client_addr,
+                    )
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/app.py 
b/airflow-core/src/airflow/api_fastapi/core_api/app.py
index 5c0bc562f12..2622aec86b9 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/app.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/app.py
@@ -165,6 +165,7 @@ def init_error_handlers(app: FastAPI) -> None:
 
 def init_middlewares(app: FastAPI) -> None:
     from airflow.api_fastapi.auth.middlewares.refresh_token import 
JWTRefreshMiddleware
+    from airflow.api_fastapi.common.http_access_log import 
HttpAccessLogMiddleware
 
     app.add_middleware(JWTRefreshMiddleware)
     if conf.getboolean("core", "simple_auth_manager_all_admins"):
@@ -172,8 +173,10 @@ def init_middlewares(app: FastAPI) -> None:
 
         app.add_middleware(SimpleAllAdminMiddleware)
 
-    # The GzipMiddleware should be the last middleware added as 
https://github.com/apache/airflow/issues/60165 points out.
-    # Compress responses greater than 1kB with optimal compression level as 5
-    # with level ranging from 1 to 9 with 1 (fastest, least compression)
-    # and 9 (slowest, most compression)
+    # GZipMiddleware must be inside HttpAccessLogMiddleware so that access 
logs capture
+    # the full end-to-end duration including compression time.
+    # See https://github.com/apache/airflow/issues/60165
     app.add_middleware(GZipMiddleware, minimum_size=1024, compresslevel=5)
+    # HttpAccessLogMiddleware must be outermost (added last) so it times the 
full
+    # request lifecycle including all inner middleware.
+    app.add_middleware(HttpAccessLogMiddleware)
diff --git a/airflow-core/src/airflow/api_fastapi/gunicorn_app.py 
b/airflow-core/src/airflow/api_fastapi/gunicorn_app.py
index e52a681b006..c01d3e8b2aa 100644
--- a/airflow-core/src/airflow/api_fastapi/gunicorn_app.py
+++ b/airflow-core/src/airflow/api_fastapi/gunicorn_app.py
@@ -30,14 +30,16 @@ The pattern follows gunicorn's recommended extension 
approach:
 
 from __future__ import annotations
 
-import logging
 import signal
 import sys
 import time
 from typing import TYPE_CHECKING, Any
 
+import structlog
 from gunicorn.app.base import BaseApplication
 from gunicorn.arbiter import Arbiter
+from gunicorn.glogging import Logger as GunicornLogger
+from uvicorn.workers import UvicornWorker
 
 from airflow.configuration import conf
 
@@ -45,7 +47,39 @@ if TYPE_CHECKING:
     from fastapi import FastAPI
     from gunicorn.app.base import Application
 
-log = logging.getLogger(__name__)
+log = structlog.get_logger(__name__)
+
+
+class AirflowGunicornLogger(GunicornLogger):
+    """
+    Gunicorn logger that routes all output through Airflow's logging setup.
+
+    Gunicorn's default Logger.setup() installs its own StreamHandler with a 
custom
+    formatter on ``gunicorn.error`` and ``gunicorn.access``, bypassing any 
root-level
+    handler (including our structlog ProcessorFormatter). Overriding setup() 
to do
+    nothing lets records propagate to root where Airflow's handler picks them 
up.
+    """
+
+    def setup(self, cfg) -> None:
+        self.error_log.propagate = True
+        self.access_log.propagate = True
+
+
+class AirflowUvicornWorker(UvicornWorker):
+    """
+    Uvicorn worker that preserves Airflow's structlog-based logging setup.
+
+    Uvicorn workers normally call 
``logging.config.dictConfig(LOGGING_CONFIG)`` on startup
+    which would override any structlog configuration applied before gunicorn 
starts.
+    Setting ``log_config=None`` prevents that. ``access_log=False`` disables 
uvicorn's
+    built-in access logger because ``HttpAccessLogMiddleware`` handles access 
logging.
+    """
+
+    CONFIG_KWARGS = {
+        **UvicornWorker.CONFIG_KWARGS,
+        "log_config": None,
+        "access_log": False,
+    }
 
 
 class AirflowArbiter(Arbiter):
@@ -198,8 +232,7 @@ class AirflowGunicornApp(BaseApplication):
         try:
             AirflowArbiter(self).run()
         except RuntimeError as e:
-            print(f"\nError: {e}\n", file=sys.stderr)
-            sys.stderr.flush()
+            log.error("Gunicorn failed to start", error=str(e))
             sys.exit(1)
 
 
@@ -210,7 +243,6 @@ def create_gunicorn_app(
     worker_timeout: int,
     ssl_cert: str | None = None,
     ssl_key: str | None = None,
-    access_log: bool = True,
     log_level: str = "info",
     proxy_headers: bool = False,
 ) -> AirflowGunicornApp:
@@ -223,18 +255,18 @@ def create_gunicorn_app(
     :param worker_timeout: Worker timeout in seconds
     :param ssl_cert: Path to SSL certificate file
     :param ssl_key: Path to SSL key file
-    :param access_log: Whether to enable access logging
     :param log_level: Log level (debug, info, warning, error, critical)
     :param proxy_headers: Whether to trust proxy headers
     """
     options = {
         "bind": f"{host}:{port}",
         "workers": num_workers,
-        "worker_class": "uvicorn.workers.UvicornWorker",
+        "worker_class": 
"airflow.api_fastapi.gunicorn_app.AirflowUvicornWorker",
         "timeout": worker_timeout,
         "graceful_timeout": worker_timeout,
         "keepalive": worker_timeout,
         "loglevel": log_level,
+        "logger_class": 
"airflow.api_fastapi.gunicorn_app.AirflowGunicornLogger",
         "preload_app": True,
         # Use our gunicorn_config module for hooks (post_worker_init, 
worker_exit)
         "config": "python:airflow.api_fastapi.gunicorn_config",
@@ -244,9 +276,6 @@ def create_gunicorn_app(
         options["certfile"] = ssl_cert
         options["keyfile"] = ssl_key
 
-    if access_log:
-        options["accesslog"] = "-"  # Log to stdout
-
     if proxy_headers:
         options["forwarded_allow_ips"] = "*"
 
diff --git a/airflow-core/src/airflow/cli/commands/api_server_command.py 
b/airflow-core/src/airflow/cli/commands/api_server_command.py
index dc7b4fabf9a..11b57305b1a 100644
--- a/airflow-core/src/airflow/cli/commands/api_server_command.py
+++ b/airflow-core/src/airflow/cli/commands/api_server_command.py
@@ -18,7 +18,6 @@
 
 from __future__ import annotations
 
-import logging
 import os
 import sys
 import textwrap
@@ -26,9 +25,9 @@ from collections.abc import Callable
 from functools import wraps
 from typing import TYPE_CHECKING, TypeVar
 
+import structlog
 import uvicorn
 
-from airflow import settings
 from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
 from airflow.configuration import conf
 from airflow.exceptions import AirflowConfigException
@@ -40,7 +39,7 @@ from airflow.utils.providers_configuration_loader import 
providers_configuration
 PS = ParamSpec("PS")
 RT = TypeVar("RT")
 
-log = logging.getLogger(__name__)
+log = structlog.get_logger(__name__)
 
 if TYPE_CHECKING:
     from argparse import Namespace
@@ -68,19 +67,6 @@ def _run_api_server_with_gunicorn(
     ssl_cert, ssl_key = _get_ssl_cert_and_key_filepaths(args)
 
     log_level = conf.get("logging", "uvicorn_logging_level", 
fallback="info").lower()
-    access_log_enabled = log_level not in ("error", "critical", "fatal")
-
-    log.info(
-        textwrap.dedent(
-            f"""\
-            Running the API server with gunicorn:
-            Apps: {apps}
-            Workers: {num_workers}
-            Host: {args.host}:{args.port}
-            Timeout: {worker_timeout}
-            
================================================================="""
-        )
-    )
 
     gunicorn_app = create_gunicorn_app(
         host=args.host,
@@ -89,7 +75,6 @@ def _run_api_server_with_gunicorn(
         worker_timeout=worker_timeout,
         ssl_cert=ssl_cert,
         ssl_key=ssl_key,
-        access_log=access_log_enabled,
         log_level=log_level,
         proxy_headers=proxy_headers,
     )
@@ -122,10 +107,7 @@ def _run_api_server_with_uvicorn(
 
         setproctitle(f"airflow api_server -- host:{args.host} 
port:{args.port}")
 
-    # Get uvicorn logging configuration from Airflow settings
     uvicorn_log_level = conf.get("logging", "uvicorn_logging_level", 
fallback="info").lower()
-    # Control access log based on uvicorn log level - disable for ERROR and 
above
-    access_log_enabled = uvicorn_log_level not in ("error", "critical", 
"fatal")
 
     uvicorn_kwargs = {
         "host": args.host,
@@ -136,11 +118,13 @@ def _run_api_server_with_uvicorn(
         "timeout_worker_healthcheck": worker_timeout,
         "ssl_keyfile": ssl_key,
         "ssl_certfile": ssl_cert,
-        "access_log": access_log_enabled,
+        # HttpAccessLogMiddleware handles access logging; disable uvicorn's 
built-in access log.
+        "access_log": False,
         "log_level": uvicorn_log_level,
         "proxy_headers": proxy_headers,
+        # Prevent uvicorn from overriding our structlog-based logging setup.
+        "log_config": None,
     }
-    # Only set the log_config if it is provided, otherwise use the default 
uvicorn logging configuration.
     if args.log_config and args.log_config != "-":
         # The [api/log_config] is migrated from [api/access_logfile] and 
[api/access_logfile] defaults to "-" for stdout for Gunicorn.
         # So we need to check if the log_config is set to "-" or not; if it is 
set to "-", we regard it as not set.
@@ -157,41 +141,50 @@ def _run_api_server(args, apps: str, num_workers: int, 
worker_timeout: int, prox
     """Run the API server using the configured server type."""
     server_type = conf.get("api", "server_type", fallback="uvicorn").lower()
 
+    run = _run_api_server_with_uvicorn
     if server_type == "gunicorn":
         try:
             import gunicorn  # noqa: F401
+
+            run = _run_api_server_with_gunicorn
         except ImportError:
             raise AirflowConfigException(
                 "Gunicorn is not installed. Install it with: pip install 
'apache-airflow-core[gunicorn]'"
             )
 
-        _run_api_server_with_gunicorn(
-            args=args,
+    log_file = args.log_file or None
+    if conf.getboolean("logging", "json_logs", fallback=False):
+        extra = {"logfile": log_file} if log_file else {}
+        log.info(
+            "Running the API server",
+            server=server_type,
             apps=apps,
-            num_workers=num_workers,
-            worker_timeout=worker_timeout,
-            proxy_headers=proxy_headers,
+            workers=num_workers,
+            host=f"{args.host}:{args.port}",
+            timeout=worker_timeout,
+            **extra,
         )
     else:
-        log.info(
+        print(
             textwrap.dedent(
                 f"""\
-                Running the API server with uvicorn:
+                Running the API server with {server_type}:
                 Apps: {apps}
                 Workers: {num_workers}
                 Host: {args.host}:{args.port}
                 Timeout: {worker_timeout}
-                Logfiles: {args.log_file or "-"}
-                
================================================================="""
+                Logfiles: {log_file or "-"}
+                
=================================================================""",
             )
         )
-        _run_api_server_with_uvicorn(
-            args=args,
-            apps=apps,
-            num_workers=num_workers,
-            worker_timeout=worker_timeout,
-            proxy_headers=proxy_headers,
-        )
+
+    run(
+        args=args,
+        apps=apps,
+        num_workers=num_workers,
+        worker_timeout=worker_timeout,
+        proxy_headers=proxy_headers,
+    )
 
 
 def with_api_apps_env(func: Callable[[Namespace], RT]) -> 
Callable[[Namespace], RT]:
@@ -221,7 +214,7 @@ def with_api_apps_env(func: Callable[[Namespace], RT]) -> 
Callable[[Namespace],
 @with_api_apps_env
 def api_server(args: Namespace):
     """Start Airflow API server."""
-    print(settings.HEADER)
+    cli_utils.print_banner()
 
     apps = args.apps
     num_workers = args.workers
diff --git a/airflow-core/src/airflow/cli/commands/db_command.py 
b/airflow-core/src/airflow/cli/commands/db_command.py
index f5c1a061a1a..9a2a199707d 100644
--- a/airflow-core/src/airflow/cli/commands/db_command.py
+++ b/airflow-core/src/airflow/cli/commands/db_command.py
@@ -18,12 +18,12 @@
 
 from __future__ import annotations
 
-import logging
 import os
 import textwrap
 from tempfile import NamedTemporaryFile
 from typing import TYPE_CHECKING
 
+import structlog
 from packaging.version import InvalidVersion, parse as parse_version
 from tenacity import Retrying, stop_after_attempt, wait_fixed
 
@@ -38,7 +38,7 @@ from airflow.utils.providers_configuration_loader import 
providers_configuration
 if TYPE_CHECKING:
     from tenacity import RetryCallState
 
-log = logging.getLogger(__name__)
+log = structlog.get_logger(__name__)
 
 
 @providers_configuration_loaded
@@ -94,7 +94,7 @@ def run_db_migrate_command(args, command, revision_heads_map: 
dict[str, str]):
 
     :meta private:
     """
-    print(f"DB: {settings.get_engine().url!r}")
+    db_url = str(settings.get_engine().url)
     if args.to_revision and args.to_version:
         raise SystemExit("Cannot supply both `--to-revision` and 
`--to-version`.")
     if args.from_version and args.from_revision:
@@ -128,16 +128,16 @@ def run_db_migrate_command(args, command, 
revision_heads_map: dict[str, str]):
         to_revision = args.to_revision
 
     if not args.show_sql_only:
-        print(f"Performing upgrade to the metadata database 
{settings.get_engine().url!r}")
+        log.info("Performing upgrade to the metadata database", url=db_url)
     else:
-        print("Generating sql for upgrade -- upgrade commands will *not* be 
submitted.")
+        log.info("Generating sql for upgrade -- upgrade commands will *not* be 
submitted.")
     command(
         to_revision=to_revision,
         from_revision=from_revision,
         show_sql_only=args.show_sql_only,
     )
     if not args.show_sql_only:
-        print("Database migration done!")
+        log.info("Database migration done!")
 
 
 def run_db_downgrade_command(args, command, revision_heads_map: dict[str, 
str]):
@@ -171,10 +171,11 @@ def run_db_downgrade_command(args, command, 
revision_heads_map: dict[str, str]):
             raise SystemExit(f"Downgrading to version {args.to_version} is not 
supported.")
     elif args.to_revision:
         to_revision = args.to_revision
+    db_url = str(settings.get_engine().url)
     if not args.show_sql_only:
-        print(f"Performing downgrade with database 
{settings.get_engine().url!r}")
+        log.info("Performing downgrade with database", url=db_url)
     else:
-        print("Generating sql for downgrade -- downgrade commands will *not* 
be submitted.")
+        log.info("Generating sql for downgrade -- downgrade commands will 
*not* be submitted.")
 
     if args.show_sql_only or (
         args.yes
@@ -187,7 +188,7 @@ def run_db_downgrade_command(args, command, 
revision_heads_map: dict[str, str]):
     ):
         command(to_revision=to_revision, from_revision=from_revision, 
show_sql_only=args.show_sql_only)
         if not args.show_sql_only:
-            print("Downgrade complete")
+            log.info("Downgrade complete")
     else:
         raise SystemExit("Cancelled")
 
diff --git a/airflow-core/src/airflow/cli/commands/kerberos_command.py 
b/airflow-core/src/airflow/cli/commands/kerberos_command.py
index 827f2d6a9d0..05b4f860df5 100644
--- a/airflow-core/src/airflow/cli/commands/kerberos_command.py
+++ b/airflow-core/src/airflow/cli/commands/kerberos_command.py
@@ -18,7 +18,6 @@
 
 from __future__ import annotations
 
-from airflow import settings
 from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
 from airflow.security import kerberos as krb
 from airflow.security.kerberos import KerberosMode
@@ -30,7 +29,7 @@ from airflow.utils.providers_configuration_loader import 
providers_configuration
 @providers_configuration_loaded
 def kerberos(args):
     """Start a kerberos ticket renewer."""
-    print(settings.HEADER)
+    cli_utils.print_banner()
 
     mode = KerberosMode.STANDARD
     if args.one_time:
diff --git a/airflow-core/src/airflow/cli/commands/scheduler_command.py 
b/airflow-core/src/airflow/cli/commands/scheduler_command.py
index f797d656257..b79bc25fb01 100644
--- a/airflow-core/src/airflow/cli/commands/scheduler_command.py
+++ b/airflow-core/src/airflow/cli/commands/scheduler_command.py
@@ -23,7 +23,6 @@ from argparse import Namespace
 from contextlib import contextmanager
 from multiprocessing import Process
 
-from airflow import settings
 from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
 from airflow.configuration import conf
 from airflow.executors.executor_loader import ExecutorLoader
@@ -53,7 +52,7 @@ def _run_scheduler_job(args) -> None:
 @providers_configuration_loaded
 def scheduler(args: Namespace):
     """Start Airflow Scheduler."""
-    print(settings.HEADER)
+    cli_utils.print_banner()
 
     if args.only_idle and args.num_runs <= 0:
         raise SystemExit("The --only-idle flag requires --num-runs to be set 
to a positive number.")
diff --git a/airflow-core/src/airflow/cli/commands/triggerer_command.py 
b/airflow-core/src/airflow/cli/commands/triggerer_command.py
index ed8a08d1d73..8b9ee178a19 100644
--- a/airflow-core/src/airflow/cli/commands/triggerer_command.py
+++ b/airflow-core/src/airflow/cli/commands/triggerer_command.py
@@ -23,7 +23,6 @@ from contextlib import contextmanager
 from functools import partial
 from multiprocessing import Process
 
-from airflow import settings
 from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
 from airflow.configuration import conf
 from airflow.exceptions import AirflowConfigException
@@ -68,7 +67,7 @@ def triggerer(args):
 
     SecretsMasker.enable_log_masking()
 
-    print(settings.HEADER)
+    cli_utils.print_banner()
     if args.queues and not conf.getboolean("triggerer", "queues_enabled", 
fallback=False):
         raise AirflowConfigException(
             "--queues option may only be used when triggerer.queues_enabled is 
`True`."
diff --git a/airflow-core/src/airflow/config_templates/config.yml 
b/airflow-core/src/airflow/config_templates/config.yml
index f931fedc473..0c002f5276c 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -832,6 +832,16 @@ logging:
       type: string
       example: ~
       default: "WARNING"
+    json_logs:
+      description: |
+        Enable JSON-structured logging for all output. When ``True``, every 
log line emitted by
+        the process is a single-line JSON object (including HTTP access logs 
from the API server).
+        Recommended for production / cloud-native deployments where logs are 
collected by a log
+        aggregator.
+      version_added: "3.2.0"
+      type: boolean
+      example: "True"
+      default: "False"
     uvicorn_logging_level:
       description: |
         Logging level for uvicorn (API server and serve-logs).
diff --git a/airflow-core/src/airflow/logging_config.py 
b/airflow-core/src/airflow/logging_config.py
index da15d0c246a..0da017d9b08 100644
--- a/airflow-core/src/airflow/logging_config.py
+++ b/airflow-core/src/airflow/logging_config.py
@@ -115,14 +115,30 @@ def configure_logging():
             log_format=getattr(logging_config, "LOG_FORMAT", 
conf.get("logging", "log_format", fallback="")),
             callsite_params=conf.getlist("logging", "callsite_parameters", 
fallback=[]),
         )
+        json_output = conf.getboolean("logging", "json_logs", fallback=False)
+
+        stdlib_config = dict(logging_config)
+        # Route uvicorn/gunicorn error loggers explicitly through our handler 
so their output
+        # is formatted correctly regardless of what propagation state those 
loggers end up in.
+        # Suppress the built-in access loggers; HttpAccessLogMiddleware and
+        # AirflowUvicornWorker.CONFIG_KWARGS take over access logging instead.
+        extra_loggers = {
+            "uvicorn.access": {"handlers": [], "propagate": False},
+            "gunicorn.access": {"handlers": [], "propagate": False},
+            "uvicorn.error": {"handlers": ["default"], "propagate": False},
+            "gunicorn.error": {"handlers": ["default"], "propagate": False},
+        }
+        stdlib_config = {**stdlib_config, "loggers": 
{**stdlib_config.get("loggers", {}), **extra_loggers}}
+
         configure_logging(
             log_level=level,
             namespace_log_levels=conf.get("logging", "namespace_levels", 
fallback=None),
-            stdlib_config=logging_config,
+            stdlib_config=stdlib_config,
             log_format=log_fmt,
             log_timestamp_format=conf.get("logging", "log_timestamp_format", 
fallback="iso"),
             callsite_parameters=callsite_params,
             colors=colors,
+            json_output=json_output,
         )
     except (ValueError, KeyError) as e:
         log.error("Unable to load the config, contains a configuration error.")
diff --git 
a/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py
 
b/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py
index 4188254ca2e..92bfc558d47 100644
--- 
a/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py
+++ 
b/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py
@@ -37,6 +37,7 @@ from collections.abc import Iterable
 from typing import TYPE_CHECKING
 
 import sqlalchemy as sa
+import structlog
 import uuid6
 from alembic import context, op
 
@@ -46,6 +47,9 @@ from airflow.serialization.enums import Encoding
 from airflow.utils.hashlib_wrapper import md5
 from airflow.utils.sqlalchemy import UtcDateTime
 
+log = structlog.get_logger(__name__)
+
+
 if TYPE_CHECKING:
     from typing import Any
 
@@ -285,7 +289,7 @@ def validate_written_data(
     ).fetchone()
 
     if not validation_result:
-        print(f"ERROR: Failed to read back deadline_alert for DeadlineAlert 
{deadline_alert_id}")
+        log.error("Failed to read back deadline_alert", 
deadline_alert_id=deadline_alert_id)
         return False
 
     checks = [
@@ -296,7 +300,7 @@ def validate_written_data(
 
     for name, actual, expected in checks:
         if actual != expected:
-            print(f"ERROR: Written {name} does not match expected! Written: 
{actual}, Expected: {expected}")
+            log.error("Written value does not match expected", field=name, 
actual=actual, expected=expected)
             return False
 
     return True
@@ -304,11 +308,9 @@ def validate_written_data(
 
 def report_errors(errors: ErrorDict, operation: str = "migration") -> None:
     if errors:
-        print(f"{len(errors)} Dags encountered errors: ")
-        for dag_id, error in errors.items():
-            print(f"  {dag_id}: {'; '.join(error)}")
+        log.warning("Dags encountered errors", operation=operation, 
count=len(errors), errors=dict(errors))
     else:
-        print(f"No Dags encountered errors during {operation}.")
+        log.info("No Dags encountered errors", operation=operation)
 
 
 def hash_dag(dag_data):
@@ -355,14 +357,10 @@ def _sort_serialized_dag_dict(serialized_dag: Any):
 def migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
     """Extract DeadlineAlert data from serialized Dag data and populate 
deadline_alert table."""
     if context.is_offline_mode():
-        print(
-            """
-            ------------
-            --  WARNING: Unable to migrate DeadlineAlert data while in offline 
mode!
-            --  The deadline_alert table will remain empty in offline mode.
-            --  Run the migration in online mode to populate the 
deadline_alert table.
-            ------------
-            """
+        log.warning(
+            "Unable to migrate DeadlineAlert data while in offline mode -- "
+            "the deadline_alert table will remain empty. "
+            "Run the migration in online mode to populate the deadline_alert 
table."
         )
         return
 
@@ -383,8 +381,7 @@ def 
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
     ).scalar()
     total_batches = (total_dags + BATCH_SIZE - 1) // BATCH_SIZE
 
-    print(f"Using migration_batch_size of {BATCH_SIZE} as set in Airflow 
configuration.")
-    print(f"Starting migration of {total_dags} Dags in {total_batches} 
batches.\n")
+    log.info("Starting migration", batch_size=BATCH_SIZE, 
total_dags=total_dags, total_batches=total_batches)
 
     while True:
         batch_num += 1
@@ -424,7 +421,7 @@ def 
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
         if not batch_results:
             break
 
-        print(f"Processing batch {batch_num}...")
+        log.info("Processing batch", batch_num=batch_num, 
total_batches=total_batches)
 
         for serialized_dag_id, dag_id, data, data_compressed, created_at in 
batch_results:
             processed_dags.append(dag_id)
@@ -555,13 +552,15 @@ def 
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
                 dags_with_errors[dag_id].append(f"Could not process serialized 
Dag: {e}")
                 savepoint.rollback()
 
-        print(f"Batch {batch_num} of {total_batches} complete.")
+        log.info("Batch complete", batch_num=batch_num, 
total_batches=total_batches)
 
-    print(
-        f"\nProcessed {len(processed_dags)} serialized_dag records 
({len(set(processed_dags))} "
-        f"unique Dags), {len(dags_with_deadlines)} had DeadlineAlerts."
+    log.info(
+        "Migration complete",
+        processed_records=len(processed_dags),
+        unique_dags=len(set(processed_dags)),
+        dags_with_deadlines=len(dags_with_deadlines),
+        migrated_alerts=migrated_alerts_count,
     )
-    print(f"Migrated {migrated_alerts_count} DeadlineAlert configurations.")
     report_errors(dags_with_errors, "migration")
 
 
@@ -570,14 +569,10 @@ def migrate_deadline_alert_data_back_to_serialized_dag() 
-> None:
     from alembic import context
 
     if context.is_offline_mode():
-        print(
-            """
-            ------------
-            --  WARNING: Unable to restore DeadlineAlert data while in offline 
mode!
-            --  The downgrade will skip data restoration in offline mode.
-            --  Run the migration in online mode to restore the deadline_alert 
data.
-            ------------
-            """
+        log.warning(
+            "Unable to restore DeadlineAlert data while in offline mode -- "
+            "the downgrade will skip data restoration. "
+            "Run the migration in online mode to restore the deadline_alert 
data."
         )
         return
 
@@ -603,8 +598,7 @@ def migrate_deadline_alert_data_back_to_serialized_dag() -> 
None:
 
     total_batches = (total_dags + BATCH_SIZE - 1) // BATCH_SIZE
 
-    print(f"Using migration_batch_size of {BATCH_SIZE} as set in Airflow 
configuration.")
-    print(f"Starting downgrade of {total_dags} Dags with DeadlineAlerts in 
{total_batches} batches.\n")
+    log.info("Starting downgrade", batch_size=BATCH_SIZE, 
total_dags=total_dags, total_batches=total_batches)
 
     while True:
         batch_num += 1
@@ -643,7 +637,7 @@ def migrate_deadline_alert_data_back_to_serialized_dag() -> 
None:
         batch_results = list(result)
         if not batch_results:
             break
-        print(f"Processing batch {batch_num}...")
+        log.info("Processing batch", batch_num=batch_num, 
total_batches=total_batches)
 
         for serialized_dag_id, dag_id, data, data_compressed in batch_results:
             processed_dags.append(dag_id)
@@ -660,7 +654,7 @@ def migrate_deadline_alert_data_back_to_serialized_dag() -> 
None:
                     continue
 
                 if not all(isinstance(uuid_val, str) for uuid_val in 
deadline_uuids):
-                    print(f"WARNING: Dag {dag_id} has non-string deadline 
values, skipping")
+                    log.warning("Dag has non-string deadline values, 
skipping", dag_id=dag_id)
                     continue
 
                 dags_with_deadlines.add(dag_id)
@@ -704,11 +698,13 @@ def migrate_deadline_alert_data_back_to_serialized_dag() 
-> None:
                 dags_with_errors[dag_id].append(f"Could not restore deadline: 
{e}")
                 savepoint.rollback()
 
-        print(f"Batch {batch_num} of {total_batches} complete.")
+        log.info("Batch complete", batch_num=batch_num, 
total_batches=total_batches)
 
-    print(
-        f"\nProcessed {len(processed_dags)} serialized_dag records 
({len(set(processed_dags))} "
-        f"unique Dags), {len(dags_with_deadlines)} had DeadlineAlerts."
+    log.info(
+        "Downgrade complete",
+        processed_records=len(processed_dags),
+        unique_dags=len(set(processed_dags)),
+        dags_with_deadlines=len(dags_with_deadlines),
+        restored_alerts=restored_alerts_count,
     )
-    print(f"Restored {restored_alerts_count} DeadlineAlert configurations to 
original format.")
     report_errors(dags_with_errors, "downgrade")
diff --git a/airflow-core/src/airflow/utils/cli.py 
b/airflow-core/src/airflow/utils/cli.py
index 4df33b26894..6af75dd5b5e 100644
--- a/airflow-core/src/airflow/utils/cli.py
+++ b/airflow-core/src/airflow/utils/cli.py
@@ -386,6 +386,14 @@ def setup_logging(filename):
     return handler.stream
 
 
+def print_banner() -> None:
+    """Print the Airflow ASCII art banner, unless JSON logging is enabled."""
+    from airflow.configuration import conf
+
+    if not conf.getboolean("logging", "json_logs", fallback=False):
+        print(settings.HEADER)
+
+
 def sigint_handler(sig, frame):
     """
     Return without error on SIGINT or SIGTERM signals in interactive command 
mode.
diff --git a/airflow-core/src/airflow/utils/db.py 
b/airflow-core/src/airflow/utils/db.py
index 71f03ea91ce..93041bc188d 100644
--- a/airflow-core/src/airflow/utils/db.py
+++ b/airflow-core/src/airflow/utils/db.py
@@ -947,8 +947,9 @@ def check_and_run_migrations():
     if sys.stdout.isatty() and verb:
         print()
         question = f"Please confirm database {verb} (or wait 4 seconds to skip 
it). Are you sure? [y/N]"
+        print_fn = log.info if conf.getboolean("logging", "json_logs", 
fallback=False) else print
         try:
-            answer = helpers.prompt_with_timeout(question, timeout=4, 
default=False)
+            answer = helpers.prompt_with_timeout(question, timeout=4, 
default=False, output_fn=print_fn)
             if answer:
                 try:
                     db_command()
@@ -969,10 +970,11 @@ def check_and_run_migrations():
     elif source_heads != db_heads:
         from airflow.version import version
 
-        print(
-            f"ERROR: You need to {verb} the database. Please run `airflow db 
{command_name}`. "
-            f"Make sure the command is run using Airflow version {version}.",
-            file=sys.stderr,
+        log.error(
+            "Database migration required. Please run `airflow db %s`. "
+            "Make sure the command is run using Airflow version %s.",
+            command_name,
+            version,
         )
         sys.exit(1)
 
diff --git a/airflow-core/src/airflow/utils/helpers.py 
b/airflow-core/src/airflow/utils/helpers.py
index 50bd8b82a46..5e2dd1b9ded 100644
--- a/airflow-core/src/airflow/utils/helpers.py
+++ b/airflow-core/src/airflow/utils/helpers.py
@@ -63,12 +63,12 @@ def validate_key(k: str, max_length: int = 250):
         )
 
 
-def ask_yesno(question: str, default: bool | None = None) -> bool:
+def ask_yesno(question: str, default: bool | None = None, output_fn=print) -> 
bool:
     """Get a yes or no answer from the user."""
     yes = {"yes", "y"}
     no = {"no", "n"}
 
-    print(question)
+    output_fn(question)
     while True:
         choice = input().lower()
         if choice == "" and default is not None:
@@ -77,10 +77,10 @@ def ask_yesno(question: str, default: bool | None = None) 
-> bool:
             return True
         if choice in no:
             return False
-        print("Please respond with y/yes or n/no.")
+        output_fn("Please respond with y/yes or n/no.")
 
 
-def prompt_with_timeout(question: str, timeout: int, default: bool | None = 
None) -> bool:
+def prompt_with_timeout(question: str, timeout: int, default: bool | None = 
None, output_fn=print) -> bool:
     """Ask the user a question and timeout if they don't respond."""
 
     def handler(signum, frame):
@@ -89,7 +89,7 @@ def prompt_with_timeout(question: str, timeout: int, default: 
bool | None = None
     signal.signal(signal.SIGALRM, handler)
     signal.alarm(timeout)
     try:
-        return ask_yesno(question, default)
+        return ask_yesno(question, default, output_fn=output_fn)
     finally:
         signal.alarm(0)
 
diff --git a/airflow-core/src/airflow/utils/serve_logs/core.py 
b/airflow-core/src/airflow/utils/serve_logs/core.py
index c09a9f603f3..5b4ee5aa686 100644
--- a/airflow-core/src/airflow/utils/serve_logs/core.py
+++ b/airflow-core/src/airflow/utils/serve_logs/core.py
@@ -52,9 +52,18 @@ def serve_logs(port=None):
     # Get uvicorn logging configuration from Airflow settings
     uvicorn_log_level = conf.get("logging", "uvicorn_logging_level", 
fallback="info").lower()
 
-    # Use uvicorn directly for ASGI applications
+    # Use uvicorn directly for ASGI applications.
+    # log_config=None: preserve the process's structlog-based logging setup 
rather than
+    # letting uvicorn reset it with its own default formatter.
+    # access_log=False: the log server serves internal file content; HTTP 
access logs
+    # are not needed and would be non-JSON noise when json_logs=True.
     uvicorn.run(
-        "airflow.utils.serve_logs.log_server:get_app", host="", port=port, 
log_level=uvicorn_log_level
+        "airflow.utils.serve_logs.log_server:get_app",
+        host="",
+        port=port,
+        log_level=uvicorn_log_level,
+        log_config=None,
+        access_log=False,
     )
     # Log serving is I/O bound and has low concurrency, so single process is 
sufficient
 
diff --git a/airflow-core/tests/unit/api_fastapi/common/test_http_access_log.py 
b/airflow-core/tests/unit/api_fastapi/common/test_http_access_log.py
new file mode 100644
index 00000000000..3547fdf22ad
--- /dev/null
+++ b/airflow-core/tests/unit/api_fastapi/common/test_http_access_log.py
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import structlog
+import structlog.testing
+from starlette.applications import Starlette
+from starlette.responses import PlainTextResponse
+from starlette.routing import Route
+from starlette.testclient import TestClient
+
+from airflow.api_fastapi.common.http_access_log import _HEALTH_PATHS, 
HttpAccessLogMiddleware
+
+
+def _make_app(raise_exc: bool = False) -> Starlette:
+    async def homepage(request):
+        if raise_exc:
+            raise RuntimeError("boom")
+        return PlainTextResponse("ok")
+
+    async def health(request):
+        return PlainTextResponse("healthy")
+
+    app = Starlette(
+        routes=[
+            Route("/", homepage),
+            Route("/api/v2/monitor/health", health),
+        ]
+    )
+    app.add_middleware(HttpAccessLogMiddleware)
+    return app
+
+
+def test_logs_request_fields():
+    with structlog.testing.capture_logs() as logs:
+        client = TestClient(_make_app(), raise_server_exceptions=False)
+        client.get("/?foo=bar")
+
+    assert len(logs) == 1
+    record = logs[0]
+    assert record["event"] == "request finished"
+    assert record["method"] == "GET"
+    assert record["path"] == "/"
+    assert record["query"] == "foo=bar"
+    assert record["status_code"] == 200
+    assert "duration_us" in record
+    assert isinstance(record["duration_us"], int)
+    assert record["duration_us"] >= 0
+    assert "client_addr" in record
+
+
+def test_health_path_not_logged():
+    with structlog.testing.capture_logs() as logs:
+        client = TestClient(_make_app(), raise_server_exceptions=False)
+        client.get("/api/v2/monitor/health")
+
+    assert logs == []
+
+
+def test_request_id_bound_to_context():
+    """request_id header is bound to structlog contextvars during the 
request."""
+    captured_context: dict = {}
+
+    async def homepage(request):
+        captured_context.update(structlog.contextvars.get_contextvars())
+        return PlainTextResponse("ok")
+
+    app = Starlette(routes=[Route("/", homepage)])
+    app.add_middleware(HttpAccessLogMiddleware)
+
+    TestClient(app).get("/", headers={"x-request-id": "test-id-123"})
+
+    assert captured_context.get("request_id") == "test-id-123"
+
+
+def test_no_request_id_when_header_absent():
+    """No request_id is bound when the header is absent."""
+    captured_context: dict = {}
+
+    async def homepage(request):
+        captured_context.update(structlog.contextvars.get_contextvars())
+        return PlainTextResponse("ok")
+
+    app = Starlette(routes=[Route("/", homepage)])
+    app.add_middleware(HttpAccessLogMiddleware)
+
+    TestClient(app).get("/")
+
+    assert "request_id" not in captured_context
+
+
+def test_exception_logs_500_status():
+    with structlog.testing.capture_logs() as logs:
+        client = TestClient(_make_app(raise_exc=True), 
raise_server_exceptions=False)
+        client.get("/")
+
+    assert len(logs) == 1
+    assert logs[0]["status_code"] == 500
+
+
+def test_non_http_scope_not_logged():
+    """Non-HTTP scopes (e.g. lifespan) are passed through without logging."""
+
+    async def lifespan_app(scope, receive, send):
+        pass
+
+    middleware = HttpAccessLogMiddleware(lifespan_app)
+
+    import asyncio
+
+    with structlog.testing.capture_logs() as logs:
+        asyncio.get_event_loop().run_until_complete(middleware({"type": 
"lifespan"}, None, None))
+
+    assert logs == []
+
+
+def test_health_paths_constant():
+    assert "/api/v2/monitor/health" in _HEALTH_PATHS
diff --git a/airflow-core/tests/unit/cli/commands/test_api_server_command.py 
b/airflow-core/tests/unit/cli/commands/test_api_server_command.py
index 58a0923a2f9..93c2c43e27b 100644
--- a/airflow-core/tests/unit/cli/commands/test_api_server_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_api_server_command.py
@@ -193,9 +193,10 @@ class TestCliApiServer(_CommonCLIUvicornTestClass):
                     "timeout_keep_alive": args.worker_timeout,
                     "timeout_graceful_shutdown": args.worker_timeout,
                     "timeout_worker_healthcheck": args.worker_timeout,
-                    "access_log": True,
+                    "access_log": False,
                     "log_level": "info",
                     "proxy_headers": args.proxy_headers,
+                    "log_config": None,
                     **expected_additional_kwargs,
                 },
             )
@@ -246,9 +247,10 @@ class TestCliApiServer(_CommonCLIUvicornTestClass):
             timeout_worker_healthcheck=60,
             ssl_keyfile=None,
             ssl_certfile=None,
-            access_log=True,
+            access_log=False,
             log_level="info",
             proxy_headers=False,
+            log_config=None,
         )
 
         if demonize:
diff --git a/airflow-core/tests/unit/cli/commands/test_gunicorn_monitor.py 
b/airflow-core/tests/unit/cli/commands/test_gunicorn_monitor.py
index 4964618a385..f80410c0a6c 100644
--- a/airflow-core/tests/unit/cli/commands/test_gunicorn_monitor.py
+++ b/airflow-core/tests/unit/cli/commands/test_gunicorn_monitor.py
@@ -342,8 +342,6 @@ class TestAirflowGunicornApp:
     def test_load_config_gunicorn_cmd_args_overrides_options(self, 
monkeypatch):
         """Test that GUNICORN_CMD_ARGS takes precedence over programmatic 
options."""
         monkeypatch.setenv("GUNICORN_CMD_ARGS", "--workers 8")
-        from gunicorn.config import Config
-
         from airflow.api_fastapi.gunicorn_app import AirflowGunicornApp
 
         def mock_init(self, options):
@@ -416,8 +414,9 @@ class TestCreateGunicornApp:
             assert options["bind"] == "0.0.0.0:8080"
             assert options["workers"] == 4
             assert options["timeout"] == 120
-            assert options["worker_class"] == "uvicorn.workers.UvicornWorker"
+            assert options["worker_class"] == 
"airflow.api_fastapi.gunicorn_app.AirflowUvicornWorker"
             assert options["preload_app"] is True
+            assert "accesslog" not in options
 
     def test_create_app_with_ssl(self):
         """Test creating an app with SSL settings."""
@@ -455,8 +454,8 @@ class TestCreateGunicornApp:
 
             assert options["forwarded_allow_ips"] == "*"
 
-    def test_create_app_with_access_log(self):
-        """Test creating an app with access logging enabled."""
+    def test_create_app_never_sets_accesslog(self):
+        """accesslog is never set; HttpAccessLogMiddleware handles HTTP access 
logging."""
         from airflow.api_fastapi.gunicorn_app import create_gunicorn_app
 
         with mock.patch("airflow.api_fastapi.gunicorn_app.AirflowGunicornApp") 
as mock_app_class:
@@ -465,26 +464,18 @@ class TestCreateGunicornApp:
                 port=8080,
                 num_workers=4,
                 worker_timeout=120,
-                access_log=True,
             )
 
             options = mock_app_class.call_args[0][0]
 
-            assert options["accesslog"] == "-"
-
-    def test_create_app_without_access_log(self):
-        """Test creating an app with access logging disabled."""
-        from airflow.api_fastapi.gunicorn_app import create_gunicorn_app
+            assert "accesslog" not in options
 
-        with mock.patch("airflow.api_fastapi.gunicorn_app.AirflowGunicornApp") 
as mock_app_class:
-            create_gunicorn_app(
-                host="0.0.0.0",
-                port=8080,
-                num_workers=4,
-                worker_timeout=120,
-                access_log=False,
-            )
+    def test_airflow_uvicorn_worker_config(self):
+        """AirflowUvicornWorker disables uvicorn's access log and log_config 
override."""
+        from uvicorn.workers import UvicornWorker
 
-            options = mock_app_class.call_args[0][0]
+        from airflow.api_fastapi.gunicorn_app import AirflowUvicornWorker
 
-            assert "accesslog" not in options
+        assert AirflowUvicornWorker.CONFIG_KWARGS["log_config"] is None
+        assert AirflowUvicornWorker.CONFIG_KWARGS["access_log"] is False
+        assert issubclass(AirflowUvicornWorker, UvicornWorker)
diff --git a/shared/logging/src/airflow_shared/logging/structlog.py 
b/shared/logging/src/airflow_shared/logging/structlog.py
index 5464d578a10..fdf67da5004 100644
--- a/shared/logging/src/airflow_shared/logging/structlog.py
+++ b/shared/logging/src/airflow_shared/logging/structlog.py
@@ -545,7 +545,7 @@ def configure_logging(
                 "level": log_level.upper(),
                 "class": "logging.StreamHandler",
                 "formatter": "structlog",
-                "stream": output,
+                "stream": output if output is not None else sys.stdout,
             },
         }
     )
@@ -566,6 +566,79 @@ def configure_logging(
 
     logging.config.dictConfig(config)
 
+    if json_output:
+        _install_excepthook()
+
+    _WarningsInterceptor.register(_showwarning)
+
+
+class _WarningsInterceptor:
+    """Holds a reference to the original ``warnings.showwarning`` so it can be 
restored."""
+
+    _original_showwarning: Callable | None = None
+
+    @staticmethod
+    def register(new_callable: Callable) -> None:
+        import warnings
+
+        if _WarningsInterceptor._original_showwarning is None:
+            _WarningsInterceptor._original_showwarning = warnings.showwarning
+        warnings.showwarning = new_callable
+
+    @staticmethod
+    def reset() -> None:
+        import warnings
+
+        if _WarningsInterceptor._original_showwarning is not None:
+            warnings.showwarning = _WarningsInterceptor._original_showwarning
+            _WarningsInterceptor._original_showwarning = None
+
+    @staticmethod
+    def emit_warning(*args: Any) -> None:
+        if _WarningsInterceptor._original_showwarning is not None:
+            _WarningsInterceptor._original_showwarning(*args)
+
+
+def _showwarning(
+    message: Warning | str,
+    category: type[Warning],
+    filename: str,
+    lineno: int,
+    file: TextIO | None = None,
+    line: str | None = None,
+) -> None:
+    """
+    Redirect Python warnings to structlog.
+
+    If ``file`` is not None the warning is forwarded to the original handler
+    (e.g. when warnings are written directly to a file handle). Otherwise the
+    warning is emitted as a structured log event on the ``py.warnings`` logger
+    so it flows through the same processor chain as all other log output.
+    """
+    if file is not None:
+        _WarningsInterceptor.emit_warning(message, category, filename, lineno, 
file, line)
+    else:
+        warning_log = reconfigure_logger(
+            structlog.get_logger("py.warnings").bind(),
+            structlog.processors.CallsiteParameterAdder,
+        )
+        warning_log.warning(str(message), category=category.__name__, 
filename=filename, lineno=lineno)
+
+
+def _install_excepthook() -> None:
+    """Replace sys.excepthook so unhandled exceptions are emitted via 
structlog."""
+    _original = sys.excepthook
+
+    def _excepthook(exc_type: type, exc_value: BaseException, exc_tb) -> None:
+        if issubclass(exc_type, KeyboardInterrupt):
+            _original(exc_type, exc_value, exc_tb)
+            return
+        structlog.get_logger("unhandled_exception").critical(
+            "Unhandled exception", exc_info=(exc_type, exc_value, exc_tb)
+        )
+
+    sys.excepthook = _excepthook
+
 
 def init_log_folder(directory: str | os.PathLike[str], new_folder_permissions: 
int):
     """
diff --git a/shared/logging/tests/logging/test_structlog.py 
b/shared/logging/tests/logging/test_structlog.py
index 42d1e715aef..c6f91edf713 100644
--- a/shared/logging/tests/logging/test_structlog.py
+++ b/shared/logging/tests/logging/test_structlog.py
@@ -390,3 +390,169 @@ def 
test_logger_respects_configured_level(structlog_config):
 
     written = sio.getvalue()
     assert "[my_logger] Debug message\n" in written
+
+
+def test_excepthook_installed_when_json_output_true(structlog_config):
+    import sys
+
+    original = sys.excepthook
+    try:
+        with structlog_config(json_output=True):
+            assert sys.excepthook is not original
+    finally:
+        sys.excepthook = original
+
+
+def test_excepthook_not_installed_when_json_output_false(structlog_config):
+    import sys
+
+    original = sys.excepthook
+    with structlog_config(json_output=False):
+        assert sys.excepthook is original
+
+
+def 
test_excepthook_routes_unhandled_exception_through_structlog(structlog_config):
+    import sys
+
+    original = sys.excepthook
+    try:
+        with structlog_config(json_output=True) as sio:
+            sys.excepthook(ValueError, ValueError("boom"), None)
+        output = sio.getvalue().decode()
+        assert "unhandled_exception" in output
+        assert "boom" in output
+    finally:
+        sys.excepthook = original
+
+
+def test_excepthook_passes_keyboard_interrupt_to_original():
+    import sys
+
+    from airflow_shared.logging.structlog import _install_excepthook
+
+    calls = []
+    original = sys.excepthook
+
+    def spy(et, ev, tb):
+        calls.append(et)
+
+    sys.excepthook = spy
+    try:
+        _install_excepthook()
+        sys.excepthook(KeyboardInterrupt, KeyboardInterrupt(), None)
+        assert calls == [KeyboardInterrupt]
+    finally:
+        sys.excepthook = original
+
+
+class TestWarningsInterceptor:
+    @pytest.fixture(autouse=True)
+    def reset(self):
+        from airflow_shared.logging.structlog import _WarningsInterceptor
+
+        _WarningsInterceptor.reset()
+        yield
+        _WarningsInterceptor.reset()
+
+    def test_register_replaces_showwarning(self):
+        import warnings
+
+        from airflow_shared.logging.structlog import _WarningsInterceptor
+
+        current = warnings.showwarning
+        sentinel = mock.MagicMock()
+        _WarningsInterceptor.register(sentinel)
+        assert warnings.showwarning is sentinel
+        assert _WarningsInterceptor._original_showwarning is current
+
+    def test_register_is_idempotent(self):
+        import warnings
+
+        from airflow_shared.logging.structlog import _WarningsInterceptor
+
+        pre_register = warnings.showwarning
+        _WarningsInterceptor.register(mock.MagicMock())
+        _WarningsInterceptor.register(mock.MagicMock())
+        assert _WarningsInterceptor._original_showwarning is pre_register
+
+    def test_reset_restores_original(self):
+        import warnings
+
+        from airflow_shared.logging.structlog import _WarningsInterceptor
+
+        pre_register = warnings.showwarning
+        _WarningsInterceptor.register(mock.MagicMock())
+        _WarningsInterceptor.reset()
+        assert warnings.showwarning is pre_register
+        assert _WarningsInterceptor._original_showwarning is None
+
+    def test_reset_when_not_registered_is_noop(self):
+        import warnings
+
+        from airflow_shared.logging.structlog import _WarningsInterceptor
+
+        pre_reset = warnings.showwarning
+        _WarningsInterceptor.reset()
+        assert warnings.showwarning is pre_reset
+
+    def test_emit_warning_delegates_to_original(self):
+        from airflow_shared.logging.structlog import _WarningsInterceptor
+
+        sentinel = mock.MagicMock()
+        _WarningsInterceptor._original_showwarning = sentinel
+        _WarningsInterceptor.emit_warning("msg", UserWarning, "file.py", 1)
+        sentinel.assert_called_once_with("msg", UserWarning, "file.py", 1)
+        _WarningsInterceptor._original_showwarning = None
+
+    def test_emit_warning_when_not_registered_is_noop(self):
+        from airflow_shared.logging.structlog import _WarningsInterceptor
+
+        _WarningsInterceptor._original_showwarning = None
+        _WarningsInterceptor.emit_warning("msg", UserWarning, "file.py", 1)
+
+
+class TestShowwarning:
+    @pytest.fixture(autouse=True)
+    def reset(self):
+        from airflow_shared.logging.structlog import _WarningsInterceptor
+
+        _WarningsInterceptor.reset()
+        yield
+        _WarningsInterceptor.reset()
+
+    def test_with_file_delegates_to_original(self):
+        from airflow_shared.logging.structlog import _showwarning, 
_WarningsInterceptor
+
+        sentinel = mock.MagicMock()
+        _WarningsInterceptor._original_showwarning = sentinel
+        fake_file = mock.MagicMock()
+        _showwarning("msg", UserWarning, "file.py", 42, file=fake_file)
+        sentinel.assert_called_once_with("msg", UserWarning, "file.py", 42, 
fake_file, None)
+        _WarningsInterceptor._original_showwarning = None
+
+    def test_without_file_logs_to_structlog(self):
+        from airflow_shared.logging.structlog import _showwarning
+
+        with structlog.testing.capture_logs() as captured:
+            _showwarning("deprecated feature", DeprecationWarning, 
"myfile.py", 10)
+
+        assert len(captured) == 1
+        event = captured[0]
+        assert event["log_level"] == "warning"
+        assert event["event"] == "deprecated feature"
+        assert event["category"] == "DeprecationWarning"
+        assert event["filename"] == "myfile.py"
+        assert event["lineno"] == 10
+
+    def test_without_file_uses_py_warnings_logger(self):
+        from airflow_shared.logging import structlog as structlog_module
+        from airflow_shared.logging.structlog import _showwarning
+
+        with mock.patch.object(structlog_module.structlog, "get_logger") as 
mock_get_logger:
+            mock_bound = mock.MagicMock()
+            mock_bound.bind.return_value = mock_bound
+            mock_get_logger.return_value = mock_bound
+            with mock.patch.object(structlog_module, "reconfigure_logger", 
return_value=mock_bound):
+                _showwarning("some warning", UserWarning, "foo.py", 1)
+
+        mock_get_logger.assert_called_once_with("py.warnings")
diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py
index a965e34b8ce..f93abe1a722 100644
--- a/task-sdk/src/airflow/sdk/log.py
+++ b/task-sdk/src/airflow/sdk/log.py
@@ -17,8 +17,6 @@
 # under the License.
 from __future__ import annotations
 
-import warnings
-from collections.abc import Callable
 from functools import cache
 from pathlib import Path
 from typing import TYPE_CHECKING, Any, BinaryIO, TextIO
@@ -55,29 +53,6 @@ class _ActiveLoggingConfig:
         cls.logging_config_loaded = True
 
 
-class _WarningsInterceptor:
-    """A class to hold the reference to the original warnings.showwarning 
function."""
-
-    _original_showwarning: Callable | None = None
-
-    @staticmethod
-    def register(new_callable: Callable) -> None:
-        if _WarningsInterceptor._original_showwarning is None:
-            _WarningsInterceptor._original_showwarning = warnings.showwarning
-        warnings.showwarning = new_callable
-
-    @staticmethod
-    def reset() -> None:
-        if _WarningsInterceptor._original_showwarning is not None:
-            warnings.showwarning = _WarningsInterceptor._original_showwarning
-            _WarningsInterceptor._original_showwarning = None
-
-    @staticmethod
-    def emit_warning(*args: Any) -> None:
-        if _WarningsInterceptor._original_showwarning is not None:
-            _WarningsInterceptor._original_showwarning(*args)
-
-
 def mask_logs(logger: Any, method_name: str, event_dict: EventDict) -> 
EventDict:
     event_dict = redact(event_dict)  # type: ignore[assignment]
     return event_dict
@@ -158,8 +133,6 @@ def configure_logging(
         callsite_parameters=callsite_params,
     )
 
-    _WarningsInterceptor.register(_showwarning)
-
 
 def logger_at_level(name: str, level: int) -> Logger:
     """Create a new logger at the given level."""
@@ -298,37 +271,8 @@ def reset_logging():
 
     :meta private:
     """
-    from airflow.sdk._shared.logging.structlog import structlog_processors
+    from airflow.sdk._shared.logging.structlog import _WarningsInterceptor, 
structlog_processors
 
     _WarningsInterceptor.reset()
     structlog_processors.cache_clear()
     logging_processors.cache_clear()
-
-
-def _showwarning(
-    message: Warning | str,
-    category: type[Warning],
-    filename: str,
-    lineno: int,
-    file: TextIO | None = None,
-    line: str | None = None,
-) -> Any:
-    """
-    Redirects warnings to structlog so they appear in task logs etc.
-
-    Implementation of showwarnings which redirects to logging, which will first
-    check to see if the file parameter is None. If a file is specified, it will
-    delegate to the original warnings implementation of showwarning. Otherwise,
-    it will call warnings.formatwarning and will log the resulting string to a
-    warnings logger named "py.warnings" with level logging.WARNING.
-    """
-    if file is not None:
-        _WarningsInterceptor.emit_warning(message, category, filename, lineno, 
file, line)
-    else:
-        from airflow.sdk._shared.logging.structlog import reconfigure_logger
-
-        log = reconfigure_logger(
-            structlog.get_logger("py.warnings").bind(), 
structlog.processors.CallsiteParameterAdder
-        )
-
-        log.warning(str(message), category=category.__name__, 
filename=filename, lineno=lineno)

Reply via email to