kaxil commented on code in PR #63365:
URL: https://github.com/apache/airflow/pull/63365#discussion_r2920212020
##########
airflow-core/src/airflow/cli/commands/api_server_command.py:
##########
@@ -157,41 +141,51 @@ def _run_api_server(args, apps: str, num_workers: int,
worker_timeout: int, prox
"""Run the API server using the configured server type."""
server_type = conf.get("api", "server_type", fallback="uvicorn").lower()
+ run = _run_api_server_with_uvicorn
if server_type == "gunicorn":
try:
import gunicorn # noqa: F401
+
+ run = _run_api_server_with_gunicorn
except ImportError:
+ server_type = "uvicorn"
raise AirflowConfigException(
Review Comment:
This `server_type = "uvicorn"` assignment is dead code. The `raise` on the
next line executes unconditionally, so the reassigned value is never observed.
Removing it would reduce confusion.
##########
shared/logging/src/airflow_shared/logging/structlog.py:
##########
@@ -539,7 +539,7 @@ def is_atty():
"level": log_level.upper(),
"class": "logging.StreamHandler",
"formatter": "structlog",
- "stream": output,
+ "stream": output if output is not None else sys.stdout,
Review Comment:
This changes the default stream for the stdlib logging handler from `None`
(which `logging.StreamHandler` interprets as `sys.stderr`) to `sys.stdout`.
That's a behavioral change for every process using this shared logging setup,
not just the API server.
If intentional, worth a note in the PR description since it could affect log
collection pipelines that separate stdout from stderr.
##########
shared/logging/src/airflow_shared/logging/structlog.py:
##########
@@ -560,6 +560,79 @@ def is_atty():
logging.config.dictConfig(config)
+ if json_output:
+ _install_excepthook()
+
+ _WarningsInterceptor.register(_showwarning)
+
+
+class _WarningsInterceptor:
+ """Holds a reference to the original ``warnings.showwarning`` so it can be
restored."""
+
+ _original_showwarning: Callable | None = None
+
+ @staticmethod
+ def register(new_callable: Callable) -> None:
+ import warnings
+
+ if _WarningsInterceptor._original_showwarning is None:
+ _WarningsInterceptor._original_showwarning = warnings.showwarning
+ warnings.showwarning = new_callable
+
+ @staticmethod
+ def reset() -> None:
+ import warnings
+
+ if _WarningsInterceptor._original_showwarning is not None:
+ warnings.showwarning = _WarningsInterceptor._original_showwarning
+ _WarningsInterceptor._original_showwarning = None
+
+ @staticmethod
+ def emit_warning(*args: Any) -> None:
+ if _WarningsInterceptor._original_showwarning is not None:
+ _WarningsInterceptor._original_showwarning(*args)
+
+
+def _showwarning(
+ message: Warning | str,
+ category: type[Warning],
+ filename: str,
+ lineno: int,
+ file: Any = None,
Review Comment:
The `file` parameter is typed as `Any` here. The original in task-sdk used
`TextIO | None`, which is more precise and matches the `warnings.showwarning`
protocol. Worth keeping the stricter type.
##########
airflow-core/src/airflow/utils/helpers.py:
##########
@@ -63,7 +63,7 @@ def validate_key(k: str, max_length: int = 250):
)
-def ask_yesno(question: str, default: bool | None = None) -> bool:
+def ask_yesno(question: str, default: bool | None = None, print=print) -> bool:
Review Comment:
Naming a parameter `print` shadows the builtin. It works (the default
captures the builtin at definition time), but it's easy to trip over. More
importantly, when `log.info` is passed as `print`, line 80's `print("Please
respond with y/yes or n/no.")` will emit a structured log event for what should
be an interactive terminal prompt.
Consider renaming to `print_fn` or `output_fn`.
##########
airflow-core/src/airflow/api_fastapi/gunicorn_app.py:
##########
@@ -30,22 +30,56 @@
from __future__ import annotations
-import logging
import signal
import sys
import time
from typing import TYPE_CHECKING, Any
+import structlog
from gunicorn.app.base import BaseApplication
from gunicorn.arbiter import Arbiter
+from gunicorn.glogging import Logger as GunicornLogger
+from uvicorn.workers import UvicornWorker
from airflow.configuration import conf
if TYPE_CHECKING:
from fastapi import FastAPI
from gunicorn.app.base import Application
-log = logging.getLogger(__name__)
+log = structlog.get_logger(__name__)
+
+
+class AirflowGunicornLogger(GunicornLogger):
+ """
+ Gunicorn logger that routes all output through Airflow's logging setup.
+
+ Gunicorn's default Logger.setup() installs its own StreamHandler with a
custom
+ formatter on ``gunicorn.error`` and ``gunicorn.access``, bypassing any
root-level
+ handler (including our structlog ProcessorFormatter). Overriding setup()
to do
+ nothing lets records propagate to root where Airflow's handler picks them
up.
+ """
+
+ def setup(self, cfg) -> None:
Review Comment:
By not calling `super().setup(cfg)`, gunicorn's `--loglevel` setting is
never applied to `gunicorn.error`/`gunicorn.access`. Records will propagate to
root regardless, so a user setting `--loglevel error` might still see
debug-level gunicorn messages if the root level is lower.
Adding `self.error_log.setLevel(cfg.loglevel)` here would preserve that
filtering.
##########
airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py:
##########
@@ -433,7 +433,13 @@ def _generate_password() -> str:
@staticmethod
def _print_output(output: str):
- name = "Simple auth manager"
- colorized_name = colored(f"{name:10}", "white")
- for line in output.splitlines():
- print(f"{colorized_name} | {line.strip()}")
+ from airflow.configuration import conf
Review Comment:
`conf` is already imported at the top of this file (line 42: `from
airflow.configuration import AIRFLOW_HOME, conf`). This local import is
redundant — just use the existing module-level `conf` directly.
##########
airflow-core/src/airflow/api_fastapi/core_api/app.py:
##########
@@ -165,15 +165,18 @@ def init_error_handlers(app: FastAPI) -> None:
def init_middlewares(app: FastAPI) -> None:
from airflow.api_fastapi.auth.middlewares.refresh_token import
JWTRefreshMiddleware
+ from airflow.api_fastapi.common.http_access_log import
HttpAccessLogMiddleware
Review Comment:
`HttpAccessLogMiddleware` is imported here but the module
`airflow.api_fastapi.common.http_access_log` doesn't exist in the repository or
in this PR's file list. This will be an `ImportError` at startup. Is this file
meant to be part of this PR, or is it coming in a separate one? If separate,
this import needs to be guarded or deferred until that file lands.
##########
shared/logging/src/airflow_shared/logging/structlog.py:
##########
@@ -560,6 +560,79 @@ def is_atty():
logging.config.dictConfig(config)
+ if json_output:
+ _install_excepthook()
+
+ _WarningsInterceptor.register(_showwarning)
+
+
+class _WarningsInterceptor:
+ """Holds a reference to the original ``warnings.showwarning`` so it can be
restored."""
+
+ _original_showwarning: Callable | None = None
+
+ @staticmethod
+ def register(new_callable: Callable) -> None:
+ import warnings
+
+ if _WarningsInterceptor._original_showwarning is None:
+ _WarningsInterceptor._original_showwarning = warnings.showwarning
+ warnings.showwarning = new_callable
+
+ @staticmethod
+ def reset() -> None:
+ import warnings
+
+ if _WarningsInterceptor._original_showwarning is not None:
+ warnings.showwarning = _WarningsInterceptor._original_showwarning
+ _WarningsInterceptor._original_showwarning = None
+
+ @staticmethod
+ def emit_warning(*args: Any) -> None:
+ if _WarningsInterceptor._original_showwarning is not None:
+ _WarningsInterceptor._original_showwarning(*args)
+
+
+def _showwarning(
+ message: Warning | str,
+ category: type[Warning],
+ filename: str,
+ lineno: int,
+ file: Any = None,
+ line: str | None = None,
+) -> None:
+ """
+ Redirect Python warnings to structlog.
+
+ If ``file`` is not None the warning is forwarded to the original handler
+ (e.g. when warnings are written directly to a file handle). Otherwise the
+ warning is emitted as a structured log event on the ``py.warnings`` logger
+ so it flows through the same processor chain as all other log output.
+ """
+ if file is not None:
+ _WarningsInterceptor.emit_warning(message, category, filename, lineno,
file, line)
+ else:
+ warning_log = reconfigure_logger(
+ structlog.get_logger("py.warnings").bind(),
+ structlog.processors.CallsiteParameterAdder,
+ )
+ warning_log.warning(str(message), category=category.__name__,
filename=filename, lineno=lineno)
+
+
+def _install_excepthook() -> None:
Review Comment:
`_install_excepthook` has no corresponding "uninstall" or "reset" mechanism.
`_WarningsInterceptor` has `reset()`, but if
`configure_logging(json_output=True)` is called more than once (common in
tests), each call wraps the previous hook in another closure. The
`KeyboardInterrupt` path would then call into the previous structlog hook
rather than the real original.
Consider adding a guard (install-once flag) or an uninstaller similar to
`_WarningsInterceptor.reset()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]