This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0266b0b14eb Add option to have structured JSON logging for _all_ API
server output (#63365)
0266b0b14eb is described below
commit 0266b0b14eb6312382ae6af7b033084bc83bb9ff
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Thu Mar 12 19:56:16 2026 +0000
Add option to have structured JSON logging for _all_ API server output
(#63365)
Previously gunicorn/uvicorn each installed their own log handlers and
formatters, bypassing Airflow's structlog ProcessorFormatter entirely. This
meant:
- Gunicorn set up its own StreamHandler on gunicorn.error / gunicorn.access,
so those records never went through structlog.
- Uvicorn workers called logging.config.dictConfig(LOGGING_CONFIG) on
startup,
overwriting the structlog configuration that was applied before gunicorn
started.
- HTTP access log lines came from uvicorn's built-in access logger
(unstructured).
- Python warnings.warn() calls and unhandled exceptions bypassed structlog
too.
Now:
- AirflowGunicornLogger overrides setup() to skip installing gunicorn's own
handlers and lets records propagate to root (where structlog is
configured).
- AirflowUvicornWorker sets log_config=None so uvicorn doesn't clobber the
logging config, and access_log=False since we handle that ourselves.
- HttpAccessLogMiddleware replaces uvicorn's access log: one structured
event
per request with method, path, status code, duration (µs), client address,
and the x-request-id header bound to the structlog context for the request
lifetime. Health-check paths are excluded to avoid noise.
- configure_logging() now explicitly silences uvicorn.access /
gunicorn.access
and routes uvicorn.error / gunicorn.error through the default handler.
- Python warnings are intercepted and emitted as structured py.warnings log
events instead of going to stderr.
- In JSON log mode, unhandled exceptions are emitted via structlog rather
than
the plain-text default sys.excepthook.
---
airflow-core/newsfragments/63365.significant.rst | 9 ++
.../auth/managers/simple/simple_auth_manager.py | 12 +-
.../airflow/api_fastapi/common/http_access_log.py | 106 +++++++++++++
.../src/airflow/api_fastapi/core_api/app.py | 11 +-
.../src/airflow/api_fastapi/gunicorn_app.py | 49 ++++--
.../src/airflow/cli/commands/api_server_command.py | 71 ++++-----
.../src/airflow/cli/commands/db_command.py | 19 +--
.../src/airflow/cli/commands/kerberos_command.py | 3 +-
.../src/airflow/cli/commands/scheduler_command.py | 3 +-
.../src/airflow/cli/commands/triggerer_command.py | 3 +-
.../src/airflow/config_templates/config.yml | 10 ++
airflow-core/src/airflow/logging_config.py | 18 ++-
.../0101_3_2_0_ui_improvements_for_deadlines.py | 74 +++++----
airflow-core/src/airflow/utils/cli.py | 8 +
airflow-core/src/airflow/utils/db.py | 12 +-
airflow-core/src/airflow/utils/helpers.py | 10 +-
airflow-core/src/airflow/utils/serve_logs/core.py | 13 +-
.../api_fastapi/common/test_http_access_log.py | 132 ++++++++++++++++
.../unit/cli/commands/test_api_server_command.py | 6 +-
.../unit/cli/commands/test_gunicorn_monitor.py | 33 ++--
.../src/airflow_shared/logging/structlog.py | 75 +++++++++-
shared/logging/tests/logging/test_structlog.py | 166 +++++++++++++++++++++
task-sdk/src/airflow/sdk/log.py | 58 +------
23 files changed, 696 insertions(+), 205 deletions(-)
diff --git a/airflow-core/newsfragments/63365.significant.rst
b/airflow-core/newsfragments/63365.significant.rst
new file mode 100644
index 00000000000..494055ec525
--- /dev/null
+++ b/airflow-core/newsfragments/63365.significant.rst
@@ -0,0 +1,9 @@
+Structured JSON logging for all API server output
+
+The new ``json_logs`` option under the ``[logging]`` section makes Airflow
+produce all its output as newline-delimited JSON (structured logs) instead of
+human-readable formatted logs. This covers the API server (gunicorn/uvicorn),
+including access logs, warnings, and unhandled exceptions.
+
+Not all components support this yet — notably ``airflow celery worker`` but
+any non-JSON output when ``json_logs`` is enabled will be treated as a bug.
diff --git
a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
index 4d7c9850351..0f89fd20c4c 100644
---
a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
+++
b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
@@ -433,7 +433,11 @@ class
SimpleAuthManager(BaseAuthManager[SimpleAuthManagerUser]):
@staticmethod
def _print_output(output: str):
- name = "Simple auth manager"
- colorized_name = colored(f"{name:10}", "white")
- for line in output.splitlines():
- print(f"{colorized_name} | {line.strip()}")
+ if conf.getboolean("logging", "json_logs", fallback=False):
+ for line in output.splitlines():
+ log.info(line.strip())
+ else:
+ name = "Simple auth manager"
+ colorized_name = colored(f"{name:10}", "white")
+ for line in output.splitlines():
+ print(f"{colorized_name} | {line.strip()}")
diff --git a/airflow-core/src/airflow/api_fastapi/common/http_access_log.py
b/airflow-core/src/airflow/api_fastapi/common/http_access_log.py
new file mode 100644
index 00000000000..581c3593790
--- /dev/null
+++ b/airflow-core/src/airflow/api_fastapi/common/http_access_log.py
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""HTTP access log middleware using structlog."""
+
+from __future__ import annotations
+
+import contextlib
+import time
+from typing import TYPE_CHECKING
+
+import structlog
+
+if TYPE_CHECKING:
+ from starlette.types import ASGIApp, Message, Receive, Scope, Send
+
+logger = structlog.get_logger(logger_name="http.access")
+
+_HEALTH_PATHS = frozenset(["/api/v2/monitor/health"])
+
+
+class HttpAccessLogMiddleware:
+ """
+ Log completed HTTP requests as structured log events.
+
+ This middleware replaces uvicorn's built-in access logger. It measures the
+ full round-trip duration, binds any ``x-request-id`` header value to the
+ structlog context for the duration of the request, and emits one log event
+ per completed request.
+
+ Health-check paths are excluded to avoid log noise.
+ """
+
+ def __init__(
+ self,
+ app: ASGIApp,
+ request_id_header: str = "x-request-id",
+ ) -> None:
+ self.app = app
+ self.request_id_header = request_id_header.lower().encode("ascii")
+
+ async def __call__(self, scope: Scope, receive: Receive, send: Send) ->
None:
+ if scope["type"] != "http":
+ await self.app(scope, receive, send)
+ return
+
+ start = time.monotonic_ns()
+ response: Message | None = None
+
+ async def capture_send(message: Message) -> None:
+ nonlocal response
+ if message["type"] == "http.response.start":
+ response = message
+ await send(message)
+
+ request_id: str | None = None
+ for name, value in scope["headers"]:
+ if name == self.request_id_header:
+ request_id = value.decode("ascii", errors="replace")
+ break
+
+ ctx = (
+ structlog.contextvars.bound_contextvars(request_id=request_id)
+ if request_id is not None
+ else contextlib.nullcontext()
+ )
+
+ with ctx:
+ try:
+ await self.app(scope, receive, capture_send)
+ except Exception:
+ if response is None:
+ response = {"status": 500}
+ raise
+ finally:
+ path = scope["path"]
+ if path not in _HEALTH_PATHS:
+ duration_us = (time.monotonic_ns() - start) // 1000
+ status = response["status"] if response is not None else 0
+ method = scope.get("method", "")
+ query = scope["query_string"].decode("ascii",
errors="replace")
+ client = scope.get("client")
+ client_addr = f"{client[0]}:{client[1]}" if client else
None
+
+ logger.info(
+ "request finished",
+ method=method,
+ path=path,
+ query=query,
+ status_code=status,
+ duration_us=duration_us,
+ client_addr=client_addr,
+ )
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/app.py
b/airflow-core/src/airflow/api_fastapi/core_api/app.py
index 5c0bc562f12..2622aec86b9 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/app.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/app.py
@@ -165,6 +165,7 @@ def init_error_handlers(app: FastAPI) -> None:
def init_middlewares(app: FastAPI) -> None:
from airflow.api_fastapi.auth.middlewares.refresh_token import
JWTRefreshMiddleware
+ from airflow.api_fastapi.common.http_access_log import
HttpAccessLogMiddleware
app.add_middleware(JWTRefreshMiddleware)
if conf.getboolean("core", "simple_auth_manager_all_admins"):
@@ -172,8 +173,10 @@ def init_middlewares(app: FastAPI) -> None:
app.add_middleware(SimpleAllAdminMiddleware)
- # The GzipMiddleware should be the last middleware added as
https://github.com/apache/airflow/issues/60165 points out.
- # Compress responses greater than 1kB with optimal compression level as 5
- # with level ranging from 1 to 9 with 1 (fastest, least compression)
- # and 9 (slowest, most compression)
+ # GZipMiddleware must be inside HttpAccessLogMiddleware so that access
logs capture
+ # the full end-to-end duration including compression time.
+ # See https://github.com/apache/airflow/issues/60165
app.add_middleware(GZipMiddleware, minimum_size=1024, compresslevel=5)
+ # HttpAccessLogMiddleware must be outermost (added last) so it times the
full
+ # request lifecycle including all inner middleware.
+ app.add_middleware(HttpAccessLogMiddleware)
diff --git a/airflow-core/src/airflow/api_fastapi/gunicorn_app.py
b/airflow-core/src/airflow/api_fastapi/gunicorn_app.py
index e52a681b006..c01d3e8b2aa 100644
--- a/airflow-core/src/airflow/api_fastapi/gunicorn_app.py
+++ b/airflow-core/src/airflow/api_fastapi/gunicorn_app.py
@@ -30,14 +30,16 @@ The pattern follows gunicorn's recommended extension
approach:
from __future__ import annotations
-import logging
import signal
import sys
import time
from typing import TYPE_CHECKING, Any
+import structlog
from gunicorn.app.base import BaseApplication
from gunicorn.arbiter import Arbiter
+from gunicorn.glogging import Logger as GunicornLogger
+from uvicorn.workers import UvicornWorker
from airflow.configuration import conf
@@ -45,7 +47,39 @@ if TYPE_CHECKING:
from fastapi import FastAPI
from gunicorn.app.base import Application
-log = logging.getLogger(__name__)
+log = structlog.get_logger(__name__)
+
+
+class AirflowGunicornLogger(GunicornLogger):
+ """
+ Gunicorn logger that routes all output through Airflow's logging setup.
+
+ Gunicorn's default Logger.setup() installs its own StreamHandler with a
custom
+ formatter on ``gunicorn.error`` and ``gunicorn.access``, bypassing any
root-level
+ handler (including our structlog ProcessorFormatter). Overriding setup()
to do
+ nothing lets records propagate to root where Airflow's handler picks them
up.
+ """
+
+ def setup(self, cfg) -> None:
+ self.error_log.propagate = True
+ self.access_log.propagate = True
+
+
+class AirflowUvicornWorker(UvicornWorker):
+ """
+ Uvicorn worker that preserves Airflow's structlog-based logging setup.
+
+ Uvicorn workers normally call
``logging.config.dictConfig(LOGGING_CONFIG)`` on startup
+ which would override any structlog configuration applied before gunicorn
starts.
+ Setting ``log_config=None`` prevents that. ``access_log=False`` disables
uvicorn's
+ built-in access logger because ``HttpAccessLogMiddleware`` handles access
logging.
+ """
+
+ CONFIG_KWARGS = {
+ **UvicornWorker.CONFIG_KWARGS,
+ "log_config": None,
+ "access_log": False,
+ }
class AirflowArbiter(Arbiter):
@@ -198,8 +232,7 @@ class AirflowGunicornApp(BaseApplication):
try:
AirflowArbiter(self).run()
except RuntimeError as e:
- print(f"\nError: {e}\n", file=sys.stderr)
- sys.stderr.flush()
+ log.error("Gunicorn failed to start", error=str(e))
sys.exit(1)
@@ -210,7 +243,6 @@ def create_gunicorn_app(
worker_timeout: int,
ssl_cert: str | None = None,
ssl_key: str | None = None,
- access_log: bool = True,
log_level: str = "info",
proxy_headers: bool = False,
) -> AirflowGunicornApp:
@@ -223,18 +255,18 @@ def create_gunicorn_app(
:param worker_timeout: Worker timeout in seconds
:param ssl_cert: Path to SSL certificate file
:param ssl_key: Path to SSL key file
- :param access_log: Whether to enable access logging
:param log_level: Log level (debug, info, warning, error, critical)
:param proxy_headers: Whether to trust proxy headers
"""
options = {
"bind": f"{host}:{port}",
"workers": num_workers,
- "worker_class": "uvicorn.workers.UvicornWorker",
+ "worker_class":
"airflow.api_fastapi.gunicorn_app.AirflowUvicornWorker",
"timeout": worker_timeout,
"graceful_timeout": worker_timeout,
"keepalive": worker_timeout,
"loglevel": log_level,
+ "logger_class":
"airflow.api_fastapi.gunicorn_app.AirflowGunicornLogger",
"preload_app": True,
# Use our gunicorn_config module for hooks (post_worker_init,
worker_exit)
"config": "python:airflow.api_fastapi.gunicorn_config",
@@ -244,9 +276,6 @@ def create_gunicorn_app(
options["certfile"] = ssl_cert
options["keyfile"] = ssl_key
- if access_log:
- options["accesslog"] = "-" # Log to stdout
-
if proxy_headers:
options["forwarded_allow_ips"] = "*"
diff --git a/airflow-core/src/airflow/cli/commands/api_server_command.py
b/airflow-core/src/airflow/cli/commands/api_server_command.py
index dc7b4fabf9a..11b57305b1a 100644
--- a/airflow-core/src/airflow/cli/commands/api_server_command.py
+++ b/airflow-core/src/airflow/cli/commands/api_server_command.py
@@ -18,7 +18,6 @@
from __future__ import annotations
-import logging
import os
import sys
import textwrap
@@ -26,9 +25,9 @@ from collections.abc import Callable
from functools import wraps
from typing import TYPE_CHECKING, TypeVar
+import structlog
import uvicorn
-from airflow import settings
from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
@@ -40,7 +39,7 @@ from airflow.utils.providers_configuration_loader import
providers_configuration
PS = ParamSpec("PS")
RT = TypeVar("RT")
-log = logging.getLogger(__name__)
+log = structlog.get_logger(__name__)
if TYPE_CHECKING:
from argparse import Namespace
@@ -68,19 +67,6 @@ def _run_api_server_with_gunicorn(
ssl_cert, ssl_key = _get_ssl_cert_and_key_filepaths(args)
log_level = conf.get("logging", "uvicorn_logging_level",
fallback="info").lower()
- access_log_enabled = log_level not in ("error", "critical", "fatal")
-
- log.info(
- textwrap.dedent(
- f"""\
- Running the API server with gunicorn:
- Apps: {apps}
- Workers: {num_workers}
- Host: {args.host}:{args.port}
- Timeout: {worker_timeout}
-
================================================================="""
- )
- )
gunicorn_app = create_gunicorn_app(
host=args.host,
@@ -89,7 +75,6 @@ def _run_api_server_with_gunicorn(
worker_timeout=worker_timeout,
ssl_cert=ssl_cert,
ssl_key=ssl_key,
- access_log=access_log_enabled,
log_level=log_level,
proxy_headers=proxy_headers,
)
@@ -122,10 +107,7 @@ def _run_api_server_with_uvicorn(
setproctitle(f"airflow api_server -- host:{args.host}
port:{args.port}")
- # Get uvicorn logging configuration from Airflow settings
uvicorn_log_level = conf.get("logging", "uvicorn_logging_level",
fallback="info").lower()
- # Control access log based on uvicorn log level - disable for ERROR and
above
- access_log_enabled = uvicorn_log_level not in ("error", "critical",
"fatal")
uvicorn_kwargs = {
"host": args.host,
@@ -136,11 +118,13 @@ def _run_api_server_with_uvicorn(
"timeout_worker_healthcheck": worker_timeout,
"ssl_keyfile": ssl_key,
"ssl_certfile": ssl_cert,
- "access_log": access_log_enabled,
+ # HttpAccessLogMiddleware handles access logging; disable uvicorn's
built-in access log.
+ "access_log": False,
"log_level": uvicorn_log_level,
"proxy_headers": proxy_headers,
+ # Prevent uvicorn from overriding our structlog-based logging setup.
+ "log_config": None,
}
- # Only set the log_config if it is provided, otherwise use the default
uvicorn logging configuration.
if args.log_config and args.log_config != "-":
# The [api/log_config] is migrated from [api/access_logfile] and
[api/access_logfile] defaults to "-" for stdout for Gunicorn.
# So we need to check if the log_config is set to "-" or not; if it is
set to "-", we regard it as not set.
@@ -157,41 +141,50 @@ def _run_api_server(args, apps: str, num_workers: int,
worker_timeout: int, prox
"""Run the API server using the configured server type."""
server_type = conf.get("api", "server_type", fallback="uvicorn").lower()
+ run = _run_api_server_with_uvicorn
if server_type == "gunicorn":
try:
import gunicorn # noqa: F401
+
+ run = _run_api_server_with_gunicorn
except ImportError:
raise AirflowConfigException(
"Gunicorn is not installed. Install it with: pip install
'apache-airflow-core[gunicorn]'"
)
- _run_api_server_with_gunicorn(
- args=args,
+ log_file = args.log_file or None
+ if conf.getboolean("logging", "json_logs", fallback=False):
+ extra = {"logfile": log_file} if log_file else {}
+ log.info(
+ "Running the API server",
+ server=server_type,
apps=apps,
- num_workers=num_workers,
- worker_timeout=worker_timeout,
- proxy_headers=proxy_headers,
+ workers=num_workers,
+ host=f"{args.host}:{args.port}",
+ timeout=worker_timeout,
+ **extra,
)
else:
- log.info(
+ print(
textwrap.dedent(
f"""\
- Running the API server with uvicorn:
+ Running the API server with {server_type}:
Apps: {apps}
Workers: {num_workers}
Host: {args.host}:{args.port}
Timeout: {worker_timeout}
- Logfiles: {args.log_file or "-"}
-
================================================================="""
+ Logfiles: {log_file or "-"}
+
=================================================================""",
)
)
- _run_api_server_with_uvicorn(
- args=args,
- apps=apps,
- num_workers=num_workers,
- worker_timeout=worker_timeout,
- proxy_headers=proxy_headers,
- )
+
+ run(
+ args=args,
+ apps=apps,
+ num_workers=num_workers,
+ worker_timeout=worker_timeout,
+ proxy_headers=proxy_headers,
+ )
def with_api_apps_env(func: Callable[[Namespace], RT]) ->
Callable[[Namespace], RT]:
@@ -221,7 +214,7 @@ def with_api_apps_env(func: Callable[[Namespace], RT]) ->
Callable[[Namespace],
@with_api_apps_env
def api_server(args: Namespace):
"""Start Airflow API server."""
- print(settings.HEADER)
+ cli_utils.print_banner()
apps = args.apps
num_workers = args.workers
diff --git a/airflow-core/src/airflow/cli/commands/db_command.py
b/airflow-core/src/airflow/cli/commands/db_command.py
index f5c1a061a1a..9a2a199707d 100644
--- a/airflow-core/src/airflow/cli/commands/db_command.py
+++ b/airflow-core/src/airflow/cli/commands/db_command.py
@@ -18,12 +18,12 @@
from __future__ import annotations
-import logging
import os
import textwrap
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING
+import structlog
from packaging.version import InvalidVersion, parse as parse_version
from tenacity import Retrying, stop_after_attempt, wait_fixed
@@ -38,7 +38,7 @@ from airflow.utils.providers_configuration_loader import
providers_configuration
if TYPE_CHECKING:
from tenacity import RetryCallState
-log = logging.getLogger(__name__)
+log = structlog.get_logger(__name__)
@providers_configuration_loaded
@@ -94,7 +94,7 @@ def run_db_migrate_command(args, command, revision_heads_map:
dict[str, str]):
:meta private:
"""
- print(f"DB: {settings.get_engine().url!r}")
+ db_url = str(settings.get_engine().url)
if args.to_revision and args.to_version:
raise SystemExit("Cannot supply both `--to-revision` and
`--to-version`.")
if args.from_version and args.from_revision:
@@ -128,16 +128,16 @@ def run_db_migrate_command(args, command,
revision_heads_map: dict[str, str]):
to_revision = args.to_revision
if not args.show_sql_only:
- print(f"Performing upgrade to the metadata database
{settings.get_engine().url!r}")
+ log.info("Performing upgrade to the metadata database", url=db_url)
else:
- print("Generating sql for upgrade -- upgrade commands will *not* be
submitted.")
+ log.info("Generating sql for upgrade -- upgrade commands will *not* be
submitted.")
command(
to_revision=to_revision,
from_revision=from_revision,
show_sql_only=args.show_sql_only,
)
if not args.show_sql_only:
- print("Database migration done!")
+ log.info("Database migration done!")
def run_db_downgrade_command(args, command, revision_heads_map: dict[str,
str]):
@@ -171,10 +171,11 @@ def run_db_downgrade_command(args, command,
revision_heads_map: dict[str, str]):
raise SystemExit(f"Downgrading to version {args.to_version} is not
supported.")
elif args.to_revision:
to_revision = args.to_revision
+ db_url = str(settings.get_engine().url)
if not args.show_sql_only:
- print(f"Performing downgrade with database
{settings.get_engine().url!r}")
+ log.info("Performing downgrade with database", url=db_url)
else:
- print("Generating sql for downgrade -- downgrade commands will *not*
be submitted.")
+ log.info("Generating sql for downgrade -- downgrade commands will
*not* be submitted.")
if args.show_sql_only or (
args.yes
@@ -187,7 +188,7 @@ def run_db_downgrade_command(args, command,
revision_heads_map: dict[str, str]):
):
command(to_revision=to_revision, from_revision=from_revision,
show_sql_only=args.show_sql_only)
if not args.show_sql_only:
- print("Downgrade complete")
+ log.info("Downgrade complete")
else:
raise SystemExit("Cancelled")
diff --git a/airflow-core/src/airflow/cli/commands/kerberos_command.py
b/airflow-core/src/airflow/cli/commands/kerberos_command.py
index 827f2d6a9d0..05b4f860df5 100644
--- a/airflow-core/src/airflow/cli/commands/kerberos_command.py
+++ b/airflow-core/src/airflow/cli/commands/kerberos_command.py
@@ -18,7 +18,6 @@
from __future__ import annotations
-from airflow import settings
from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.security import kerberos as krb
from airflow.security.kerberos import KerberosMode
@@ -30,7 +29,7 @@ from airflow.utils.providers_configuration_loader import
providers_configuration
@providers_configuration_loaded
def kerberos(args):
"""Start a kerberos ticket renewer."""
- print(settings.HEADER)
+ cli_utils.print_banner()
mode = KerberosMode.STANDARD
if args.one_time:
diff --git a/airflow-core/src/airflow/cli/commands/scheduler_command.py
b/airflow-core/src/airflow/cli/commands/scheduler_command.py
index f797d656257..b79bc25fb01 100644
--- a/airflow-core/src/airflow/cli/commands/scheduler_command.py
+++ b/airflow-core/src/airflow/cli/commands/scheduler_command.py
@@ -23,7 +23,6 @@ from argparse import Namespace
from contextlib import contextmanager
from multiprocessing import Process
-from airflow import settings
from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.configuration import conf
from airflow.executors.executor_loader import ExecutorLoader
@@ -53,7 +52,7 @@ def _run_scheduler_job(args) -> None:
@providers_configuration_loaded
def scheduler(args: Namespace):
"""Start Airflow Scheduler."""
- print(settings.HEADER)
+ cli_utils.print_banner()
if args.only_idle and args.num_runs <= 0:
raise SystemExit("The --only-idle flag requires --num-runs to be set
to a positive number.")
diff --git a/airflow-core/src/airflow/cli/commands/triggerer_command.py
b/airflow-core/src/airflow/cli/commands/triggerer_command.py
index ed8a08d1d73..8b9ee178a19 100644
--- a/airflow-core/src/airflow/cli/commands/triggerer_command.py
+++ b/airflow-core/src/airflow/cli/commands/triggerer_command.py
@@ -23,7 +23,6 @@ from contextlib import contextmanager
from functools import partial
from multiprocessing import Process
-from airflow import settings
from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
@@ -68,7 +67,7 @@ def triggerer(args):
SecretsMasker.enable_log_masking()
- print(settings.HEADER)
+ cli_utils.print_banner()
if args.queues and not conf.getboolean("triggerer", "queues_enabled",
fallback=False):
raise AirflowConfigException(
"--queues option may only be used when triggerer.queues_enabled is
`True`."
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index f931fedc473..0c002f5276c 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -832,6 +832,16 @@ logging:
type: string
example: ~
default: "WARNING"
+ json_logs:
+ description: |
+ Enable JSON-structured logging for all output. When ``True``, every
log line emitted by
+ the process is a single-line JSON object (including HTTP access logs
from the API server).
+ Recommended for production / cloud-native deployments where logs are
collected by a log
+ aggregator.
+ version_added: "3.2.0"
+ type: boolean
+ example: "True"
+ default: "False"
uvicorn_logging_level:
description: |
Logging level for uvicorn (API server and serve-logs).
diff --git a/airflow-core/src/airflow/logging_config.py
b/airflow-core/src/airflow/logging_config.py
index da15d0c246a..0da017d9b08 100644
--- a/airflow-core/src/airflow/logging_config.py
+++ b/airflow-core/src/airflow/logging_config.py
@@ -115,14 +115,30 @@ def configure_logging():
log_format=getattr(logging_config, "LOG_FORMAT",
conf.get("logging", "log_format", fallback="")),
callsite_params=conf.getlist("logging", "callsite_parameters",
fallback=[]),
)
+ json_output = conf.getboolean("logging", "json_logs", fallback=False)
+
+ stdlib_config = dict(logging_config)
+ # Route uvicorn/gunicorn error loggers explicitly through our handler
so their output
+ # is formatted correctly regardless of what propagation state those
loggers end up in.
+ # Suppress the built-in access loggers; HttpAccessLogMiddleware and
+ # AirflowUvicornWorker.CONFIG_KWARGS take over access logging instead.
+ extra_loggers = {
+ "uvicorn.access": {"handlers": [], "propagate": False},
+ "gunicorn.access": {"handlers": [], "propagate": False},
+ "uvicorn.error": {"handlers": ["default"], "propagate": False},
+ "gunicorn.error": {"handlers": ["default"], "propagate": False},
+ }
+ stdlib_config = {**stdlib_config, "loggers":
{**stdlib_config.get("loggers", {}), **extra_loggers}}
+
configure_logging(
log_level=level,
namespace_log_levels=conf.get("logging", "namespace_levels",
fallback=None),
- stdlib_config=logging_config,
+ stdlib_config=stdlib_config,
log_format=log_fmt,
log_timestamp_format=conf.get("logging", "log_timestamp_format",
fallback="iso"),
callsite_parameters=callsite_params,
colors=colors,
+ json_output=json_output,
)
except (ValueError, KeyError) as e:
log.error("Unable to load the config, contains a configuration error.")
diff --git
a/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py
b/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py
index 4188254ca2e..92bfc558d47 100644
---
a/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py
+++
b/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py
@@ -37,6 +37,7 @@ from collections.abc import Iterable
from typing import TYPE_CHECKING
import sqlalchemy as sa
+import structlog
import uuid6
from alembic import context, op
@@ -46,6 +47,9 @@ from airflow.serialization.enums import Encoding
from airflow.utils.hashlib_wrapper import md5
from airflow.utils.sqlalchemy import UtcDateTime
+log = structlog.get_logger(__name__)
+
+
if TYPE_CHECKING:
from typing import Any
@@ -285,7 +289,7 @@ def validate_written_data(
).fetchone()
if not validation_result:
- print(f"ERROR: Failed to read back deadline_alert for DeadlineAlert
{deadline_alert_id}")
+ log.error("Failed to read back deadline_alert",
deadline_alert_id=deadline_alert_id)
return False
checks = [
@@ -296,7 +300,7 @@ def validate_written_data(
for name, actual, expected in checks:
if actual != expected:
- print(f"ERROR: Written {name} does not match expected! Written:
{actual}, Expected: {expected}")
+ log.error("Written value does not match expected", field=name,
actual=actual, expected=expected)
return False
return True
@@ -304,11 +308,9 @@ def validate_written_data(
def report_errors(errors: ErrorDict, operation: str = "migration") -> None:
if errors:
- print(f"{len(errors)} Dags encountered errors: ")
- for dag_id, error in errors.items():
- print(f" {dag_id}: {'; '.join(error)}")
+ log.warning("Dags encountered errors", operation=operation,
count=len(errors), errors=dict(errors))
else:
- print(f"No Dags encountered errors during {operation}.")
+ log.info("No Dags encountered errors", operation=operation)
def hash_dag(dag_data):
@@ -355,14 +357,10 @@ def _sort_serialized_dag_dict(serialized_dag: Any):
def migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
"""Extract DeadlineAlert data from serialized Dag data and populate
deadline_alert table."""
if context.is_offline_mode():
- print(
- """
- ------------
- -- WARNING: Unable to migrate DeadlineAlert data while in offline
mode!
- -- The deadline_alert table will remain empty in offline mode.
- -- Run the migration in online mode to populate the
deadline_alert table.
- ------------
- """
+ log.warning(
+ "Unable to migrate DeadlineAlert data while in offline mode -- "
+ "the deadline_alert table will remain empty. "
+ "Run the migration in online mode to populate the deadline_alert
table."
)
return
@@ -383,8 +381,7 @@ def
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
).scalar()
total_batches = (total_dags + BATCH_SIZE - 1) // BATCH_SIZE
- print(f"Using migration_batch_size of {BATCH_SIZE} as set in Airflow
configuration.")
- print(f"Starting migration of {total_dags} Dags in {total_batches}
batches.\n")
+ log.info("Starting migration", batch_size=BATCH_SIZE,
total_dags=total_dags, total_batches=total_batches)
while True:
batch_num += 1
@@ -424,7 +421,7 @@ def
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
if not batch_results:
break
- print(f"Processing batch {batch_num}...")
+ log.info("Processing batch", batch_num=batch_num,
total_batches=total_batches)
for serialized_dag_id, dag_id, data, data_compressed, created_at in
batch_results:
processed_dags.append(dag_id)
@@ -555,13 +552,15 @@ def
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
dags_with_errors[dag_id].append(f"Could not process serialized
Dag: {e}")
savepoint.rollback()
- print(f"Batch {batch_num} of {total_batches} complete.")
+ log.info("Batch complete", batch_num=batch_num,
total_batches=total_batches)
- print(
- f"\nProcessed {len(processed_dags)} serialized_dag records
({len(set(processed_dags))} "
- f"unique Dags), {len(dags_with_deadlines)} had DeadlineAlerts."
+ log.info(
+ "Migration complete",
+ processed_records=len(processed_dags),
+ unique_dags=len(set(processed_dags)),
+ dags_with_deadlines=len(dags_with_deadlines),
+ migrated_alerts=migrated_alerts_count,
)
- print(f"Migrated {migrated_alerts_count} DeadlineAlert configurations.")
report_errors(dags_with_errors, "migration")
@@ -570,14 +569,10 @@ def migrate_deadline_alert_data_back_to_serialized_dag()
-> None:
from alembic import context
if context.is_offline_mode():
- print(
- """
- ------------
- -- WARNING: Unable to restore DeadlineAlert data while in offline
mode!
- -- The downgrade will skip data restoration in offline mode.
- -- Run the migration in online mode to restore the deadline_alert
data.
- ------------
- """
+ log.warning(
+ "Unable to restore DeadlineAlert data while in offline mode -- "
+ "the downgrade will skip data restoration. "
+ "Run the migration in online mode to restore the deadline_alert
data."
)
return
@@ -603,8 +598,7 @@ def migrate_deadline_alert_data_back_to_serialized_dag() ->
None:
total_batches = (total_dags + BATCH_SIZE - 1) // BATCH_SIZE
- print(f"Using migration_batch_size of {BATCH_SIZE} as set in Airflow
configuration.")
- print(f"Starting downgrade of {total_dags} Dags with DeadlineAlerts in
{total_batches} batches.\n")
+ log.info("Starting downgrade", batch_size=BATCH_SIZE,
total_dags=total_dags, total_batches=total_batches)
while True:
batch_num += 1
@@ -643,7 +637,7 @@ def migrate_deadline_alert_data_back_to_serialized_dag() ->
None:
batch_results = list(result)
if not batch_results:
break
- print(f"Processing batch {batch_num}...")
+ log.info("Processing batch", batch_num=batch_num,
total_batches=total_batches)
for serialized_dag_id, dag_id, data, data_compressed in batch_results:
processed_dags.append(dag_id)
@@ -660,7 +654,7 @@ def migrate_deadline_alert_data_back_to_serialized_dag() ->
None:
continue
if not all(isinstance(uuid_val, str) for uuid_val in
deadline_uuids):
- print(f"WARNING: Dag {dag_id} has non-string deadline
values, skipping")
+ log.warning("Dag has non-string deadline values,
skipping", dag_id=dag_id)
continue
dags_with_deadlines.add(dag_id)
@@ -704,11 +698,13 @@ def migrate_deadline_alert_data_back_to_serialized_dag()
-> None:
dags_with_errors[dag_id].append(f"Could not restore deadline:
{e}")
savepoint.rollback()
- print(f"Batch {batch_num} of {total_batches} complete.")
+ log.info("Batch complete", batch_num=batch_num,
total_batches=total_batches)
- print(
- f"\nProcessed {len(processed_dags)} serialized_dag records
({len(set(processed_dags))} "
- f"unique Dags), {len(dags_with_deadlines)} had DeadlineAlerts."
+ log.info(
+ "Downgrade complete",
+ processed_records=len(processed_dags),
+ unique_dags=len(set(processed_dags)),
+ dags_with_deadlines=len(dags_with_deadlines),
+ restored_alerts=restored_alerts_count,
)
- print(f"Restored {restored_alerts_count} DeadlineAlert configurations to
original format.")
report_errors(dags_with_errors, "downgrade")
diff --git a/airflow-core/src/airflow/utils/cli.py
b/airflow-core/src/airflow/utils/cli.py
index 4df33b26894..6af75dd5b5e 100644
--- a/airflow-core/src/airflow/utils/cli.py
+++ b/airflow-core/src/airflow/utils/cli.py
@@ -386,6 +386,14 @@ def setup_logging(filename):
return handler.stream
+def print_banner() -> None:
+ """Print the Airflow ASCII art banner, unless JSON logging is enabled."""
+ from airflow.configuration import conf
+
+ if not conf.getboolean("logging", "json_logs", fallback=False):
+ print(settings.HEADER)
+
+
def sigint_handler(sig, frame):
"""
Return without error on SIGINT or SIGTERM signals in interactive command
mode.
diff --git a/airflow-core/src/airflow/utils/db.py
b/airflow-core/src/airflow/utils/db.py
index 71f03ea91ce..93041bc188d 100644
--- a/airflow-core/src/airflow/utils/db.py
+++ b/airflow-core/src/airflow/utils/db.py
@@ -947,8 +947,9 @@ def check_and_run_migrations():
if sys.stdout.isatty() and verb:
print()
question = f"Please confirm database {verb} (or wait 4 seconds to skip
it). Are you sure? [y/N]"
+ print_fn = log.info if conf.getboolean("logging", "json_logs",
fallback=False) else print
try:
- answer = helpers.prompt_with_timeout(question, timeout=4,
default=False)
+ answer = helpers.prompt_with_timeout(question, timeout=4,
default=False, output_fn=print_fn)
if answer:
try:
db_command()
@@ -969,10 +970,11 @@ def check_and_run_migrations():
elif source_heads != db_heads:
from airflow.version import version
- print(
- f"ERROR: You need to {verb} the database. Please run `airflow db
{command_name}`. "
- f"Make sure the command is run using Airflow version {version}.",
- file=sys.stderr,
+ log.error(
+ "Database migration required. Please run `airflow db %s`. "
+ "Make sure the command is run using Airflow version %s.",
+ command_name,
+ version,
)
sys.exit(1)
diff --git a/airflow-core/src/airflow/utils/helpers.py
b/airflow-core/src/airflow/utils/helpers.py
index 50bd8b82a46..5e2dd1b9ded 100644
--- a/airflow-core/src/airflow/utils/helpers.py
+++ b/airflow-core/src/airflow/utils/helpers.py
@@ -63,12 +63,12 @@ def validate_key(k: str, max_length: int = 250):
)
-def ask_yesno(question: str, default: bool | None = None) -> bool:
+def ask_yesno(question: str, default: bool | None = None, output_fn=print) ->
bool:
"""Get a yes or no answer from the user."""
yes = {"yes", "y"}
no = {"no", "n"}
- print(question)
+ output_fn(question)
while True:
choice = input().lower()
if choice == "" and default is not None:
@@ -77,10 +77,10 @@ def ask_yesno(question: str, default: bool | None = None)
-> bool:
return True
if choice in no:
return False
- print("Please respond with y/yes or n/no.")
+ output_fn("Please respond with y/yes or n/no.")
-def prompt_with_timeout(question: str, timeout: int, default: bool | None =
None) -> bool:
+def prompt_with_timeout(question: str, timeout: int, default: bool | None =
None, output_fn=print) -> bool:
"""Ask the user a question and timeout if they don't respond."""
def handler(signum, frame):
@@ -89,7 +89,7 @@ def prompt_with_timeout(question: str, timeout: int, default:
bool | None = None
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout)
try:
- return ask_yesno(question, default)
+ return ask_yesno(question, default, output_fn=output_fn)
finally:
signal.alarm(0)
diff --git a/airflow-core/src/airflow/utils/serve_logs/core.py
b/airflow-core/src/airflow/utils/serve_logs/core.py
index c09a9f603f3..5b4ee5aa686 100644
--- a/airflow-core/src/airflow/utils/serve_logs/core.py
+++ b/airflow-core/src/airflow/utils/serve_logs/core.py
@@ -52,9 +52,18 @@ def serve_logs(port=None):
# Get uvicorn logging configuration from Airflow settings
uvicorn_log_level = conf.get("logging", "uvicorn_logging_level",
fallback="info").lower()
- # Use uvicorn directly for ASGI applications
+ # Use uvicorn directly for ASGI applications.
+ # log_config=None: preserve the process's structlog-based logging setup
rather than
+ # letting uvicorn reset it with its own default formatter.
+ # access_log=False: the log server serves internal file content; HTTP
access logs
+ # are not needed and would be non-JSON noise when json_logs=True.
uvicorn.run(
- "airflow.utils.serve_logs.log_server:get_app", host="", port=port,
log_level=uvicorn_log_level
+ "airflow.utils.serve_logs.log_server:get_app",
+ host="",
+ port=port,
+ log_level=uvicorn_log_level,
+ log_config=None,
+ access_log=False,
)
# Log serving is I/O bound and has low concurrency, so single process is
sufficient
diff --git a/airflow-core/tests/unit/api_fastapi/common/test_http_access_log.py
b/airflow-core/tests/unit/api_fastapi/common/test_http_access_log.py
new file mode 100644
index 00000000000..3547fdf22ad
--- /dev/null
+++ b/airflow-core/tests/unit/api_fastapi/common/test_http_access_log.py
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import structlog
+import structlog.testing
+from starlette.applications import Starlette
+from starlette.responses import PlainTextResponse
+from starlette.routing import Route
+from starlette.testclient import TestClient
+
+from airflow.api_fastapi.common.http_access_log import _HEALTH_PATHS,
HttpAccessLogMiddleware
+
+
+def _make_app(raise_exc: bool = False) -> Starlette:
+ async def homepage(request):
+ if raise_exc:
+ raise RuntimeError("boom")
+ return PlainTextResponse("ok")
+
+ async def health(request):
+ return PlainTextResponse("healthy")
+
+ app = Starlette(
+ routes=[
+ Route("/", homepage),
+ Route("/api/v2/monitor/health", health),
+ ]
+ )
+ app.add_middleware(HttpAccessLogMiddleware)
+ return app
+
+
+def test_logs_request_fields():
+ with structlog.testing.capture_logs() as logs:
+ client = TestClient(_make_app(), raise_server_exceptions=False)
+ client.get("/?foo=bar")
+
+ assert len(logs) == 1
+ record = logs[0]
+ assert record["event"] == "request finished"
+ assert record["method"] == "GET"
+ assert record["path"] == "/"
+ assert record["query"] == "foo=bar"
+ assert record["status_code"] == 200
+ assert "duration_us" in record
+ assert isinstance(record["duration_us"], int)
+ assert record["duration_us"] >= 0
+ assert "client_addr" in record
+
+
+def test_health_path_not_logged():
+ with structlog.testing.capture_logs() as logs:
+ client = TestClient(_make_app(), raise_server_exceptions=False)
+ client.get("/api/v2/monitor/health")
+
+ assert logs == []
+
+
+def test_request_id_bound_to_context():
+ """request_id header is bound to structlog contextvars during the
request."""
+ captured_context: dict = {}
+
+ async def homepage(request):
+ captured_context.update(structlog.contextvars.get_contextvars())
+ return PlainTextResponse("ok")
+
+ app = Starlette(routes=[Route("/", homepage)])
+ app.add_middleware(HttpAccessLogMiddleware)
+
+ TestClient(app).get("/", headers={"x-request-id": "test-id-123"})
+
+ assert captured_context.get("request_id") == "test-id-123"
+
+
+def test_no_request_id_when_header_absent():
+ """No request_id is bound when the header is absent."""
+ captured_context: dict = {}
+
+ async def homepage(request):
+ captured_context.update(structlog.contextvars.get_contextvars())
+ return PlainTextResponse("ok")
+
+ app = Starlette(routes=[Route("/", homepage)])
+ app.add_middleware(HttpAccessLogMiddleware)
+
+ TestClient(app).get("/")
+
+ assert "request_id" not in captured_context
+
+
+def test_exception_logs_500_status():
+ with structlog.testing.capture_logs() as logs:
+ client = TestClient(_make_app(raise_exc=True),
raise_server_exceptions=False)
+ client.get("/")
+
+ assert len(logs) == 1
+ assert logs[0]["status_code"] == 500
+
+
+def test_non_http_scope_not_logged():
+ """Non-HTTP scopes (e.g. lifespan) are passed through without logging."""
+
+ async def lifespan_app(scope, receive, send):
+ pass
+
+ middleware = HttpAccessLogMiddleware(lifespan_app)
+
+ import asyncio
+
+ with structlog.testing.capture_logs() as logs:
+ asyncio.get_event_loop().run_until_complete(middleware({"type":
"lifespan"}, None, None))
+
+ assert logs == []
+
+
+def test_health_paths_constant():
+ assert "/api/v2/monitor/health" in _HEALTH_PATHS
diff --git a/airflow-core/tests/unit/cli/commands/test_api_server_command.py
b/airflow-core/tests/unit/cli/commands/test_api_server_command.py
index 58a0923a2f9..93c2c43e27b 100644
--- a/airflow-core/tests/unit/cli/commands/test_api_server_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_api_server_command.py
@@ -193,9 +193,10 @@ class TestCliApiServer(_CommonCLIUvicornTestClass):
"timeout_keep_alive": args.worker_timeout,
"timeout_graceful_shutdown": args.worker_timeout,
"timeout_worker_healthcheck": args.worker_timeout,
- "access_log": True,
+ "access_log": False,
"log_level": "info",
"proxy_headers": args.proxy_headers,
+ "log_config": None,
**expected_additional_kwargs,
},
)
@@ -246,9 +247,10 @@ class TestCliApiServer(_CommonCLIUvicornTestClass):
timeout_worker_healthcheck=60,
ssl_keyfile=None,
ssl_certfile=None,
- access_log=True,
+ access_log=False,
log_level="info",
proxy_headers=False,
+ log_config=None,
)
if demonize:
diff --git a/airflow-core/tests/unit/cli/commands/test_gunicorn_monitor.py
b/airflow-core/tests/unit/cli/commands/test_gunicorn_monitor.py
index 4964618a385..f80410c0a6c 100644
--- a/airflow-core/tests/unit/cli/commands/test_gunicorn_monitor.py
+++ b/airflow-core/tests/unit/cli/commands/test_gunicorn_monitor.py
@@ -342,8 +342,6 @@ class TestAirflowGunicornApp:
def test_load_config_gunicorn_cmd_args_overrides_options(self,
monkeypatch):
"""Test that GUNICORN_CMD_ARGS takes precedence over programmatic
options."""
monkeypatch.setenv("GUNICORN_CMD_ARGS", "--workers 8")
- from gunicorn.config import Config
-
from airflow.api_fastapi.gunicorn_app import AirflowGunicornApp
def mock_init(self, options):
@@ -416,8 +414,9 @@ class TestCreateGunicornApp:
assert options["bind"] == "0.0.0.0:8080"
assert options["workers"] == 4
assert options["timeout"] == 120
- assert options["worker_class"] == "uvicorn.workers.UvicornWorker"
+ assert options["worker_class"] ==
"airflow.api_fastapi.gunicorn_app.AirflowUvicornWorker"
assert options["preload_app"] is True
+ assert "accesslog" not in options
def test_create_app_with_ssl(self):
"""Test creating an app with SSL settings."""
@@ -455,8 +454,8 @@ class TestCreateGunicornApp:
assert options["forwarded_allow_ips"] == "*"
- def test_create_app_with_access_log(self):
- """Test creating an app with access logging enabled."""
+ def test_create_app_never_sets_accesslog(self):
+ """accesslog is never set; HttpAccessLogMiddleware handles HTTP access
logging."""
from airflow.api_fastapi.gunicorn_app import create_gunicorn_app
with mock.patch("airflow.api_fastapi.gunicorn_app.AirflowGunicornApp")
as mock_app_class:
@@ -465,26 +464,18 @@ class TestCreateGunicornApp:
port=8080,
num_workers=4,
worker_timeout=120,
- access_log=True,
)
options = mock_app_class.call_args[0][0]
- assert options["accesslog"] == "-"
-
- def test_create_app_without_access_log(self):
- """Test creating an app with access logging disabled."""
- from airflow.api_fastapi.gunicorn_app import create_gunicorn_app
+ assert "accesslog" not in options
- with mock.patch("airflow.api_fastapi.gunicorn_app.AirflowGunicornApp")
as mock_app_class:
- create_gunicorn_app(
- host="0.0.0.0",
- port=8080,
- num_workers=4,
- worker_timeout=120,
- access_log=False,
- )
+ def test_airflow_uvicorn_worker_config(self):
+ """AirflowUvicornWorker disables uvicorn's access log and log_config
override."""
+ from uvicorn.workers import UvicornWorker
- options = mock_app_class.call_args[0][0]
+ from airflow.api_fastapi.gunicorn_app import AirflowUvicornWorker
- assert "accesslog" not in options
+ assert AirflowUvicornWorker.CONFIG_KWARGS["log_config"] is None
+ assert AirflowUvicornWorker.CONFIG_KWARGS["access_log"] is False
+ assert issubclass(AirflowUvicornWorker, UvicornWorker)
diff --git a/shared/logging/src/airflow_shared/logging/structlog.py
b/shared/logging/src/airflow_shared/logging/structlog.py
index 5464d578a10..fdf67da5004 100644
--- a/shared/logging/src/airflow_shared/logging/structlog.py
+++ b/shared/logging/src/airflow_shared/logging/structlog.py
@@ -545,7 +545,7 @@ def configure_logging(
"level": log_level.upper(),
"class": "logging.StreamHandler",
"formatter": "structlog",
- "stream": output,
+ "stream": output if output is not None else sys.stdout,
},
}
)
@@ -566,6 +566,79 @@ def configure_logging(
logging.config.dictConfig(config)
+ if json_output:
+ _install_excepthook()
+
+ _WarningsInterceptor.register(_showwarning)
+
+
+class _WarningsInterceptor:
+ """Holds a reference to the original ``warnings.showwarning`` so it can be
restored."""
+
+ _original_showwarning: Callable | None = None
+
+ @staticmethod
+ def register(new_callable: Callable) -> None:
+ import warnings
+
+ if _WarningsInterceptor._original_showwarning is None:
+ _WarningsInterceptor._original_showwarning = warnings.showwarning
+ warnings.showwarning = new_callable
+
+ @staticmethod
+ def reset() -> None:
+ import warnings
+
+ if _WarningsInterceptor._original_showwarning is not None:
+ warnings.showwarning = _WarningsInterceptor._original_showwarning
+ _WarningsInterceptor._original_showwarning = None
+
+ @staticmethod
+ def emit_warning(*args: Any) -> None:
+ if _WarningsInterceptor._original_showwarning is not None:
+ _WarningsInterceptor._original_showwarning(*args)
+
+
+def _showwarning(
+ message: Warning | str,
+ category: type[Warning],
+ filename: str,
+ lineno: int,
+ file: TextIO | None = None,
+ line: str | None = None,
+) -> None:
+ """
+ Redirect Python warnings to structlog.
+
+ If ``file`` is not None the warning is forwarded to the original handler
+ (e.g. when warnings are written directly to a file handle). Otherwise the
+ warning is emitted as a structured log event on the ``py.warnings`` logger
+ so it flows through the same processor chain as all other log output.
+ """
+ if file is not None:
+ _WarningsInterceptor.emit_warning(message, category, filename, lineno,
file, line)
+ else:
+ warning_log = reconfigure_logger(
+ structlog.get_logger("py.warnings").bind(),
+ structlog.processors.CallsiteParameterAdder,
+ )
+ warning_log.warning(str(message), category=category.__name__,
filename=filename, lineno=lineno)
+
+
+def _install_excepthook() -> None:
+ """Replace sys.excepthook so unhandled exceptions are emitted via
structlog."""
+ _original = sys.excepthook
+
+ def _excepthook(exc_type: type, exc_value: BaseException, exc_tb) -> None:
+ if issubclass(exc_type, KeyboardInterrupt):
+ _original(exc_type, exc_value, exc_tb)
+ return
+ structlog.get_logger("unhandled_exception").critical(
+ "Unhandled exception", exc_info=(exc_type, exc_value, exc_tb)
+ )
+
+ sys.excepthook = _excepthook
+
def init_log_folder(directory: str | os.PathLike[str], new_folder_permissions:
int):
"""
diff --git a/shared/logging/tests/logging/test_structlog.py
b/shared/logging/tests/logging/test_structlog.py
index 42d1e715aef..c6f91edf713 100644
--- a/shared/logging/tests/logging/test_structlog.py
+++ b/shared/logging/tests/logging/test_structlog.py
@@ -390,3 +390,169 @@ def
test_logger_respects_configured_level(structlog_config):
written = sio.getvalue()
assert "[my_logger] Debug message\n" in written
+
+
+def test_excepthook_installed_when_json_output_true(structlog_config):
+ import sys
+
+ original = sys.excepthook
+ try:
+ with structlog_config(json_output=True):
+ assert sys.excepthook is not original
+ finally:
+ sys.excepthook = original
+
+
+def test_excepthook_not_installed_when_json_output_false(structlog_config):
+ import sys
+
+ original = sys.excepthook
+ with structlog_config(json_output=False):
+ assert sys.excepthook is original
+
+
+def
test_excepthook_routes_unhandled_exception_through_structlog(structlog_config):
+ import sys
+
+ original = sys.excepthook
+ try:
+ with structlog_config(json_output=True) as sio:
+ sys.excepthook(ValueError, ValueError("boom"), None)
+ output = sio.getvalue().decode()
+ assert "unhandled_exception" in output
+ assert "boom" in output
+ finally:
+ sys.excepthook = original
+
+
+def test_excepthook_passes_keyboard_interrupt_to_original():
+ import sys
+
+ from airflow_shared.logging.structlog import _install_excepthook
+
+ calls = []
+ original = sys.excepthook
+
+ def spy(et, ev, tb):
+ calls.append(et)
+
+ sys.excepthook = spy
+ try:
+ _install_excepthook()
+ sys.excepthook(KeyboardInterrupt, KeyboardInterrupt(), None)
+ assert calls == [KeyboardInterrupt]
+ finally:
+ sys.excepthook = original
+
+
+class TestWarningsInterceptor:
+ @pytest.fixture(autouse=True)
+ def reset(self):
+ from airflow_shared.logging.structlog import _WarningsInterceptor
+
+ _WarningsInterceptor.reset()
+ yield
+ _WarningsInterceptor.reset()
+
+ def test_register_replaces_showwarning(self):
+ import warnings
+
+ from airflow_shared.logging.structlog import _WarningsInterceptor
+
+ current = warnings.showwarning
+ sentinel = mock.MagicMock()
+ _WarningsInterceptor.register(sentinel)
+ assert warnings.showwarning is sentinel
+ assert _WarningsInterceptor._original_showwarning is current
+
+ def test_register_is_idempotent(self):
+ import warnings
+
+ from airflow_shared.logging.structlog import _WarningsInterceptor
+
+ pre_register = warnings.showwarning
+ _WarningsInterceptor.register(mock.MagicMock())
+ _WarningsInterceptor.register(mock.MagicMock())
+ assert _WarningsInterceptor._original_showwarning is pre_register
+
+ def test_reset_restores_original(self):
+ import warnings
+
+ from airflow_shared.logging.structlog import _WarningsInterceptor
+
+ pre_register = warnings.showwarning
+ _WarningsInterceptor.register(mock.MagicMock())
+ _WarningsInterceptor.reset()
+ assert warnings.showwarning is pre_register
+ assert _WarningsInterceptor._original_showwarning is None
+
+ def test_reset_when_not_registered_is_noop(self):
+ import warnings
+
+ from airflow_shared.logging.structlog import _WarningsInterceptor
+
+ pre_reset = warnings.showwarning
+ _WarningsInterceptor.reset()
+ assert warnings.showwarning is pre_reset
+
+ def test_emit_warning_delegates_to_original(self):
+ from airflow_shared.logging.structlog import _WarningsInterceptor
+
+ sentinel = mock.MagicMock()
+ _WarningsInterceptor._original_showwarning = sentinel
+ _WarningsInterceptor.emit_warning("msg", UserWarning, "file.py", 1)
+ sentinel.assert_called_once_with("msg", UserWarning, "file.py", 1)
+ _WarningsInterceptor._original_showwarning = None
+
+ def test_emit_warning_when_not_registered_is_noop(self):
+ from airflow_shared.logging.structlog import _WarningsInterceptor
+
+ _WarningsInterceptor._original_showwarning = None
+ _WarningsInterceptor.emit_warning("msg", UserWarning, "file.py", 1)
+
+
+class TestShowwarning:
+ @pytest.fixture(autouse=True)
+ def reset(self):
+ from airflow_shared.logging.structlog import _WarningsInterceptor
+
+ _WarningsInterceptor.reset()
+ yield
+ _WarningsInterceptor.reset()
+
+ def test_with_file_delegates_to_original(self):
+ from airflow_shared.logging.structlog import _showwarning,
_WarningsInterceptor
+
+ sentinel = mock.MagicMock()
+ _WarningsInterceptor._original_showwarning = sentinel
+ fake_file = mock.MagicMock()
+ _showwarning("msg", UserWarning, "file.py", 42, file=fake_file)
+ sentinel.assert_called_once_with("msg", UserWarning, "file.py", 42,
fake_file, None)
+ _WarningsInterceptor._original_showwarning = None
+
+ def test_without_file_logs_to_structlog(self):
+ from airflow_shared.logging.structlog import _showwarning
+
+ with structlog.testing.capture_logs() as captured:
+ _showwarning("deprecated feature", DeprecationWarning,
"myfile.py", 10)
+
+ assert len(captured) == 1
+ event = captured[0]
+ assert event["log_level"] == "warning"
+ assert event["event"] == "deprecated feature"
+ assert event["category"] == "DeprecationWarning"
+ assert event["filename"] == "myfile.py"
+ assert event["lineno"] == 10
+
+ def test_without_file_uses_py_warnings_logger(self):
+ from airflow_shared.logging import structlog as structlog_module
+ from airflow_shared.logging.structlog import _showwarning
+
+ with mock.patch.object(structlog_module.structlog, "get_logger") as
mock_get_logger:
+ mock_bound = mock.MagicMock()
+ mock_bound.bind.return_value = mock_bound
+ mock_get_logger.return_value = mock_bound
+ with mock.patch.object(structlog_module, "reconfigure_logger",
return_value=mock_bound):
+ _showwarning("some warning", UserWarning, "foo.py", 1)
+
+ mock_get_logger.assert_called_once_with("py.warnings")
diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py
index a965e34b8ce..f93abe1a722 100644
--- a/task-sdk/src/airflow/sdk/log.py
+++ b/task-sdk/src/airflow/sdk/log.py
@@ -17,8 +17,6 @@
# under the License.
from __future__ import annotations
-import warnings
-from collections.abc import Callable
from functools import cache
from pathlib import Path
from typing import TYPE_CHECKING, Any, BinaryIO, TextIO
@@ -55,29 +53,6 @@ class _ActiveLoggingConfig:
cls.logging_config_loaded = True
-class _WarningsInterceptor:
- """A class to hold the reference to the original warnings.showwarning
function."""
-
- _original_showwarning: Callable | None = None
-
- @staticmethod
- def register(new_callable: Callable) -> None:
- if _WarningsInterceptor._original_showwarning is None:
- _WarningsInterceptor._original_showwarning = warnings.showwarning
- warnings.showwarning = new_callable
-
- @staticmethod
- def reset() -> None:
- if _WarningsInterceptor._original_showwarning is not None:
- warnings.showwarning = _WarningsInterceptor._original_showwarning
- _WarningsInterceptor._original_showwarning = None
-
- @staticmethod
- def emit_warning(*args: Any) -> None:
- if _WarningsInterceptor._original_showwarning is not None:
- _WarningsInterceptor._original_showwarning(*args)
-
-
def mask_logs(logger: Any, method_name: str, event_dict: EventDict) ->
EventDict:
event_dict = redact(event_dict) # type: ignore[assignment]
return event_dict
@@ -158,8 +133,6 @@ def configure_logging(
callsite_parameters=callsite_params,
)
- _WarningsInterceptor.register(_showwarning)
-
def logger_at_level(name: str, level: int) -> Logger:
"""Create a new logger at the given level."""
@@ -298,37 +271,8 @@ def reset_logging():
:meta private:
"""
- from airflow.sdk._shared.logging.structlog import structlog_processors
+ from airflow.sdk._shared.logging.structlog import _WarningsInterceptor,
structlog_processors
_WarningsInterceptor.reset()
structlog_processors.cache_clear()
logging_processors.cache_clear()
-
-
-def _showwarning(
- message: Warning | str,
- category: type[Warning],
- filename: str,
- lineno: int,
- file: TextIO | None = None,
- line: str | None = None,
-) -> Any:
- """
- Redirects warnings to structlog so they appear in task logs etc.
-
- Implementation of showwarnings which redirects to logging, which will first
- check to see if the file parameter is None. If a file is specified, it will
- delegate to the original warnings implementation of showwarning. Otherwise,
- it will call warnings.formatwarning and will log the resulting string to a
- warnings logger named "py.warnings" with level logging.WARNING.
- """
- if file is not None:
- _WarningsInterceptor.emit_warning(message, category, filename, lineno,
file, line)
- else:
- from airflow.sdk._shared.logging.structlog import reconfigure_logger
-
- log = reconfigure_logger(
- structlog.get_logger("py.warnings").bind(),
structlog.processors.CallsiteParameterAdder
- )
-
- log.warning(str(message), category=category.__name__,
filename=filename, lineno=lineno)