kaxil commented on code in PR #63365:
URL: https://github.com/apache/airflow/pull/63365#discussion_r2920212020


##########
airflow-core/src/airflow/cli/commands/api_server_command.py:
##########
@@ -157,41 +141,51 @@ def _run_api_server(args, apps: str, num_workers: int, 
worker_timeout: int, prox
     """Run the API server using the configured server type."""
     server_type = conf.get("api", "server_type", fallback="uvicorn").lower()
 
+    run = _run_api_server_with_uvicorn
     if server_type == "gunicorn":
         try:
             import gunicorn  # noqa: F401
+
+            run = _run_api_server_with_gunicorn
         except ImportError:
+            server_type = "uvicorn"
             raise AirflowConfigException(

Review Comment:
   This `server_type = "uvicorn"` assignment is dead code. The `raise` on the 
next line executes unconditionally, so the reassigned value is never observed. 
Removing it would reduce confusion.



##########
shared/logging/src/airflow_shared/logging/structlog.py:
##########
@@ -539,7 +539,7 @@ def is_atty():
                 "level": log_level.upper(),
                 "class": "logging.StreamHandler",
                 "formatter": "structlog",
-                "stream": output,
+                "stream": output if output is not None else sys.stdout,

Review Comment:
   This changes the default stream for the stdlib logging handler from `None` 
(which `logging.StreamHandler` interprets as `sys.stderr`) to `sys.stdout`. 
That's a behavioral change for every process using this shared logging setup, 
not just the API server.
   
   If intentional, worth a note in the PR description since it could affect log 
collection pipelines that separate stdout from stderr.



##########
shared/logging/src/airflow_shared/logging/structlog.py:
##########
@@ -560,6 +560,79 @@ def is_atty():
 
     logging.config.dictConfig(config)
 
+    if json_output:
+        _install_excepthook()
+
+    _WarningsInterceptor.register(_showwarning)
+
+
+class _WarningsInterceptor:
+    """Holds a reference to the original ``warnings.showwarning`` so it can be 
restored."""
+
+    _original_showwarning: Callable | None = None
+
+    @staticmethod
+    def register(new_callable: Callable) -> None:
+        import warnings
+
+        if _WarningsInterceptor._original_showwarning is None:
+            _WarningsInterceptor._original_showwarning = warnings.showwarning
+        warnings.showwarning = new_callable
+
+    @staticmethod
+    def reset() -> None:
+        import warnings
+
+        if _WarningsInterceptor._original_showwarning is not None:
+            warnings.showwarning = _WarningsInterceptor._original_showwarning
+            _WarningsInterceptor._original_showwarning = None
+
+    @staticmethod
+    def emit_warning(*args: Any) -> None:
+        if _WarningsInterceptor._original_showwarning is not None:
+            _WarningsInterceptor._original_showwarning(*args)
+
+
+def _showwarning(
+    message: Warning | str,
+    category: type[Warning],
+    filename: str,
+    lineno: int,
+    file: Any = None,

Review Comment:
   The `file` parameter is typed as `Any` here. The original in task-sdk used 
`TextIO | None`, which is more precise and matches the `warnings.showwarning` 
protocol. Worth keeping the stricter type.



##########
airflow-core/src/airflow/utils/helpers.py:
##########
@@ -63,7 +63,7 @@ def validate_key(k: str, max_length: int = 250):
         )
 
 
-def ask_yesno(question: str, default: bool | None = None) -> bool:
+def ask_yesno(question: str, default: bool | None = None, print=print) -> bool:

Review Comment:
   Naming a parameter `print` shadows the builtin. It works (the default 
captures the builtin at definition time), but it's easy to trip over. More 
importantly, when `log.info` is passed as `print`, line 80's `print("Please 
respond with y/yes or n/no.")` will emit a structured log event for what should 
be an interactive terminal prompt.
   
   Consider renaming to `print_fn` or `output_fn`.



##########
airflow-core/src/airflow/api_fastapi/gunicorn_app.py:
##########
@@ -30,22 +30,56 @@
 
 from __future__ import annotations
 
-import logging
 import signal
 import sys
 import time
 from typing import TYPE_CHECKING, Any
 
+import structlog
 from gunicorn.app.base import BaseApplication
 from gunicorn.arbiter import Arbiter
+from gunicorn.glogging import Logger as GunicornLogger
+from uvicorn.workers import UvicornWorker
 
 from airflow.configuration import conf
 
 if TYPE_CHECKING:
     from fastapi import FastAPI
     from gunicorn.app.base import Application
 
-log = logging.getLogger(__name__)
+log = structlog.get_logger(__name__)
+
+
+class AirflowGunicornLogger(GunicornLogger):
+    """
+    Gunicorn logger that routes all output through Airflow's logging setup.
+
+    Gunicorn's default Logger.setup() installs its own StreamHandler with a 
custom
+    formatter on ``gunicorn.error`` and ``gunicorn.access``, bypassing any 
root-level
+    handler (including our structlog ProcessorFormatter). Overriding setup() 
to do
+    nothing lets records propagate to root where Airflow's handler picks them 
up.
+    """
+
+    def setup(self, cfg) -> None:

Review Comment:
   By not calling `super().setup(cfg)`, gunicorn's `--loglevel` setting is 
never applied to `gunicorn.error`/`gunicorn.access`. Records will propagate to 
root regardless, so a user setting `--loglevel error` might still see 
debug-level gunicorn messages if the root level is lower.
   
   Adding `self.error_log.setLevel(cfg.loglevel)` here would preserve that 
filtering.



##########
airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py:
##########
@@ -433,7 +433,13 @@ def _generate_password() -> str:
 
     @staticmethod
     def _print_output(output: str):
-        name = "Simple auth manager"
-        colorized_name = colored(f"{name:10}", "white")
-        for line in output.splitlines():
-            print(f"{colorized_name} | {line.strip()}")
+        from airflow.configuration import conf

Review Comment:
   `conf` is already imported at the top of this file (line 42: `from 
airflow.configuration import AIRFLOW_HOME, conf`). This local import is 
redundant — just use the existing module-level `conf` directly.



##########
airflow-core/src/airflow/api_fastapi/core_api/app.py:
##########
@@ -165,15 +165,18 @@ def init_error_handlers(app: FastAPI) -> None:
 
 def init_middlewares(app: FastAPI) -> None:
     from airflow.api_fastapi.auth.middlewares.refresh_token import 
JWTRefreshMiddleware
+    from airflow.api_fastapi.common.http_access_log import 
HttpAccessLogMiddleware

Review Comment:
   `HttpAccessLogMiddleware` is imported here but the module 
`airflow.api_fastapi.common.http_access_log` doesn't exist in the repository or 
in this PR's file list. This will be an `ImportError` at startup. Is this file 
meant to be part of this PR, or is it coming in a separate one? If separate, 
this import needs to be guarded or deferred until that file lands.



##########
shared/logging/src/airflow_shared/logging/structlog.py:
##########
@@ -560,6 +560,79 @@ def is_atty():
 
     logging.config.dictConfig(config)
 
+    if json_output:
+        _install_excepthook()
+
+    _WarningsInterceptor.register(_showwarning)
+
+
+class _WarningsInterceptor:
+    """Holds a reference to the original ``warnings.showwarning`` so it can be 
restored."""
+
+    _original_showwarning: Callable | None = None
+
+    @staticmethod
+    def register(new_callable: Callable) -> None:
+        import warnings
+
+        if _WarningsInterceptor._original_showwarning is None:
+            _WarningsInterceptor._original_showwarning = warnings.showwarning
+        warnings.showwarning = new_callable
+
+    @staticmethod
+    def reset() -> None:
+        import warnings
+
+        if _WarningsInterceptor._original_showwarning is not None:
+            warnings.showwarning = _WarningsInterceptor._original_showwarning
+            _WarningsInterceptor._original_showwarning = None
+
+    @staticmethod
+    def emit_warning(*args: Any) -> None:
+        if _WarningsInterceptor._original_showwarning is not None:
+            _WarningsInterceptor._original_showwarning(*args)
+
+
+def _showwarning(
+    message: Warning | str,
+    category: type[Warning],
+    filename: str,
+    lineno: int,
+    file: Any = None,
+    line: str | None = None,
+) -> None:
+    """
+    Redirect Python warnings to structlog.
+
+    If ``file`` is not None the warning is forwarded to the original handler
+    (e.g. when warnings are written directly to a file handle). Otherwise the
+    warning is emitted as a structured log event on the ``py.warnings`` logger
+    so it flows through the same processor chain as all other log output.
+    """
+    if file is not None:
+        _WarningsInterceptor.emit_warning(message, category, filename, lineno, 
file, line)
+    else:
+        warning_log = reconfigure_logger(
+            structlog.get_logger("py.warnings").bind(),
+            structlog.processors.CallsiteParameterAdder,
+        )
+        warning_log.warning(str(message), category=category.__name__, 
filename=filename, lineno=lineno)
+
+
+def _install_excepthook() -> None:

Review Comment:
   `_install_excepthook` has no corresponding "uninstall" or "reset" mechanism. 
`_WarningsInterceptor` has `reset()`, but if 
`configure_logging(json_output=True)` is called more than once (common in 
tests), each call wraps the previous hook in another closure. The 
`KeyboardInterrupt` path would then call into the previous structlog hook 
rather than the real original.
   
   Consider adding a guard (install-once flag) or an uninstaller similar to 
`_WarningsInterceptor.reset()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to