This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c9e39d26893 Use structured logs to ensure log message go to stdout
(#58541)
c9e39d26893 is described below
commit c9e39d26893ae2fdaef976a88ec50c28dac95550
Author: Jed Cunningham <[email protected]>
AuthorDate: Mon Nov 24 08:12:49 2025 -0700
Use structured logs to ensure log message go to stdout (#58541)
Migrate LocalExecutor, ExecutorLoader, and log server to use
structured logging (structlog), as the stdlib root logger goes to
stderr instead of stdout. And, we are moving everything over to
structlog anyway, so this is more consistent too.
---
airflow-core/src/airflow/executors/executor_loader.py | 5 +++--
airflow-core/src/airflow/executors/local_executor.py | 9 ++++++---
airflow-core/src/airflow/utils/serve_logs/core.py | 4 ++--
3 files changed, 11 insertions(+), 7 deletions(-)
diff --git a/airflow-core/src/airflow/executors/executor_loader.py
b/airflow-core/src/airflow/executors/executor_loader.py
index 5ea355b30a8..5c86c3020bc 100644
--- a/airflow-core/src/airflow/executors/executor_loader.py
+++ b/airflow-core/src/airflow/executors/executor_loader.py
@@ -18,11 +18,12 @@
from __future__ import annotations
-import logging
import os
from collections import defaultdict
from typing import TYPE_CHECKING
+import structlog
+
from airflow.exceptions import AirflowConfigException, UnknownExecutorException
from airflow.executors.executor_constants import (
CELERY_EXECUTOR,
@@ -35,7 +36,7 @@ from airflow.executors.executor_utils import ExecutorName
from airflow.models.team import Team
from airflow.utils.module_loading import import_string
-log = logging.getLogger(__name__)
+log = structlog.get_logger(__name__)
if TYPE_CHECKING:
from airflow.executors.base_executor import BaseExecutor
diff --git a/airflow-core/src/airflow/executors/local_executor.py
b/airflow-core/src/airflow/executors/local_executor.py
index 1df402213d3..83d5ccc9c5d 100644
--- a/airflow-core/src/airflow/executors/local_executor.py
+++ b/airflow-core/src/airflow/executors/local_executor.py
@@ -26,7 +26,6 @@ LocalExecutor.
from __future__ import annotations
import ctypes
-import logging
import multiprocessing
import multiprocessing.sharedctypes
import os
@@ -34,6 +33,8 @@ import sys
from multiprocessing import Queue, SimpleQueue
from typing import TYPE_CHECKING
+import structlog
+
from airflow.executors import workloads
from airflow.executors.base_executor import BaseExecutor
from airflow.utils.state import TaskInstanceState
@@ -47,6 +48,8 @@ else:
setproctitle = lambda title, logger: real_setproctitle(title)
if TYPE_CHECKING:
+ from structlog.typing import FilteringBoundLogger as Logger
+
TaskInstanceStateType = tuple[workloads.TaskInstance, TaskInstanceState,
Exception | None]
@@ -61,7 +64,7 @@ def _run_worker(
# Ignore ctrl-c in this process -- we don't want to kill _this_ one. we
let tasks run to completion
signal.signal(signal.SIGINT, signal.SIG_IGN)
- log = logging.getLogger(logger_name)
+ log = structlog.get_logger(logger_name)
log.info("Worker starting up pid=%d", os.getpid())
while True:
@@ -101,7 +104,7 @@ def _run_worker(
output.put((key, TaskInstanceState.FAILED, e))
-def _execute_work(log: logging.Logger, workload: workloads.ExecuteTask) ->
None:
+def _execute_work(log: Logger, workload: workloads.ExecuteTask) -> None:
"""
Execute command received and stores result state in queue.
diff --git a/airflow-core/src/airflow/utils/serve_logs/core.py
b/airflow-core/src/airflow/utils/serve_logs/core.py
index f7dcade8d5f..ddd44c5bc8f 100644
--- a/airflow-core/src/airflow/utils/serve_logs/core.py
+++ b/airflow-core/src/airflow/utils/serve_logs/core.py
@@ -18,15 +18,15 @@
from __future__ import annotations
-import logging
import socket
import sys
+import structlog
import uvicorn
from airflow.configuration import conf
-logger = logging.getLogger(__name__)
+logger = structlog.get_logger(__name__)
def serve_logs(port=None):