This is an automated email from the ASF dual-hosted git repository. pierrejeambrun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 933b0039f66 Replacing gunicornmontor with uvicorn.run() (#45103) 933b0039f66 is described below commit 933b0039f6652937006d24796b6a5a69d24e850a Author: vatsrahul1001 <43964496+vatsrahul1...@users.noreply.github.com> AuthorDate: Fri Jan 24 14:33:08 2025 +0530 Replacing gunicornmontor with uvicorn.run() (#45103) * replace gunicorm with uvicorn.run() * fixing tests * Daemonized fastapi server * fixing setproctitle format * updating setproctitle --- .../commands/local_commands/fastapi_api_command.py | 149 +++------------------ .../local_commands/test_fastapi_api_command.py | 115 ++-------------- 2 files changed, 33 insertions(+), 231 deletions(-) diff --git a/airflow/cli/commands/local_commands/fastapi_api_command.py b/airflow/cli/commands/local_commands/fastapi_api_command.py index e9fdd870916..cc66940648b 100644 --- a/airflow/cli/commands/local_commands/fastapi_api_command.py +++ b/airflow/cli/commands/local_commands/fastapi_api_command.py @@ -20,25 +20,16 @@ from __future__ import annotations import logging import os -import signal import subprocess -import sys import textwrap -from contextlib import suppress -from pathlib import Path -from time import sleep -from typing import NoReturn -import psutil -from lockfile.pidlockfile import read_pid_from_pidfile -from uvicorn.workers import UvicornWorker +import uvicorn +from gunicorn.util import daemonize +from setproctitle import setproctitle from airflow import settings -from airflow.cli.commands.local_commands.daemon_utils import run_command_with_daemon_option -from airflow.cli.commands.local_commands.webserver_command import GunicornMonitor from airflow.exceptions import AirflowConfigException from airflow.utils import cli as cli_utils -from airflow.utils.cli import setup_locations from airflow.utils.providers_configuration_loader import providers_configuration_loaded log = logging.getLogger(__name__) @@ -47,8 +38,6 @@ log = logging.getLogger(__name__) # This shouldn't be necessary but there seems to be an issue in uvloop that causes bad file descriptor # errors when shutting down workers. Despite the 'closed' status of the issue it is not solved, # more info here: https://github.com/benoitc/gunicorn/issues/1877#issuecomment-1911136399 -AirflowUvicornWorker = UvicornWorker -AirflowUvicornWorker.CONFIG_KWARGS = {"loop": "asyncio", "http": "auto"} @cli_utils.action_cli @@ -59,18 +48,13 @@ def fastapi_api(args): apps = args.apps access_logfile = args.access_logfile or "-" - error_logfile = args.error_logfile or "-" access_logformat = args.access_logformat num_workers = args.workers worker_timeout = args.worker_timeout - worker_class = "airflow.cli.commands.local_commands.fastapi_api_command.AirflowUvicornWorker" - - from airflow.api_fastapi.app import create_app - if args.debug: print(f"Starting the FastAPI API server on port {args.port} and host {args.hostname} debug.") - log.warning("Running in dev mode, ignoring gunicorn args") + log.warning("Running in dev mode, ignoring uvicorn args") run_args = [ "fastapi", @@ -93,124 +77,35 @@ def fastapi_api(args): process.wait() os.environ.pop("AIRFLOW_API_APPS") else: + if args.daemon: + daemonize() + log.info("Daemonized the FastAPI API server process PID: %s", os.getpid()) + log.info( textwrap.dedent( f"""\ - Running the Gunicorn Server with: + Running the uvicorn with: Apps: {apps} - Workers: {num_workers} {worker_class} + Workers: {num_workers} Host: {args.hostname}:{args.port} Timeout: {worker_timeout} - Logfiles: {access_logfile} {error_logfile} + Logfiles: {access_logfile} Access Logformat: {access_logformat} =================================================================""" ) ) - - pid_file, _, _, _ = setup_locations("fastapi-api", pid=args.pid) - - run_args = [ - sys.executable, - "-m", - "gunicorn", - "--workers", - str(num_workers), - "--worker-class", - str(worker_class), - "--timeout", - str(worker_timeout), - "--bind", - args.hostname + ":" + str(args.port), - "--name", - "airflow-fastapi-api", - "--pid", - pid_file, - "--access-logfile", - str(access_logfile), - "--error-logfile", - str(error_logfile), - "--config", - "python:airflow.api_fastapi.gunicorn_config", - ] - ssl_cert, ssl_key = _get_ssl_cert_and_key_filepaths(args) - if ssl_cert and ssl_key: - run_args += ["--certfile", ssl_cert, "--keyfile", ssl_key] - - if args.access_logformat and args.access_logformat.strip(): - run_args += ["--access-logformat", str(args.access_logformat)] - - if args.daemon: - run_args += ["--daemon"] - - run_args += [f"airflow.api_fastapi.app:cached_app(apps='{apps}')"] - - # To prevent different workers creating the web app and - # all writing to the database at the same time, we use the --preload option. - # With the preload option, the app is loaded before the workers are forked, and each worker will - # then have a copy of the app - run_args += ["--preload"] - - def kill_proc(signum: int, gunicorn_master_proc: psutil.Process | subprocess.Popen) -> NoReturn: - log.info("Received signal: %s. Closing gunicorn.", signum) - gunicorn_master_proc.terminate() - with suppress(TimeoutError): - gunicorn_master_proc.wait(timeout=30) - if isinstance(gunicorn_master_proc, subprocess.Popen): - still_running = gunicorn_master_proc.poll() is not None - else: - still_running = gunicorn_master_proc.is_running() - if still_running: - gunicorn_master_proc.kill() - sys.exit(0) - - def monitor_gunicorn(gunicorn_master_proc: psutil.Process | subprocess.Popen) -> NoReturn: - # Register signal handlers - signal.signal(signal.SIGINT, lambda signum, _: kill_proc(signum, gunicorn_master_proc)) - signal.signal(signal.SIGTERM, lambda signum, _: kill_proc(signum, gunicorn_master_proc)) - - # These run forever until SIG{INT, TERM, KILL, ...} signal is sent - GunicornMonitor( - gunicorn_master_pid=gunicorn_master_proc.pid, - num_workers_expected=num_workers, - master_timeout=120, - worker_refresh_interval=30, - worker_refresh_batch_size=1, - reload_on_plugin_change=False, - ).start() - - def start_and_monitor_gunicorn(args): - if args.daemon: - subprocess.Popen(run_args, close_fds=True) - - # Reading pid of gunicorn master as it will be different that - # the one of process spawned above. - gunicorn_master_proc_pid = None - while not gunicorn_master_proc_pid: - sleep(0.1) - gunicorn_master_proc_pid = read_pid_from_pidfile(pid_file) - - # Run Gunicorn monitor - gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid) - monitor_gunicorn(gunicorn_master_proc) - else: - with subprocess.Popen(run_args, close_fds=True) as gunicorn_master_proc: - monitor_gunicorn(gunicorn_master_proc) - - if args.daemon: - # This makes possible errors get reported before daemonization - os.environ["SKIP_DAGS_PARSING"] = "True" - create_app(apps) - os.environ.pop("SKIP_DAGS_PARSING") - - pid_file_path = Path(pid_file) - monitor_pid_file = str(pid_file_path.with_name(f"{pid_file_path.stem}-monitor{pid_file_path.suffix}")) - run_command_with_daemon_option( - args=args, - process_name="fastapi-api", - callback=lambda: start_and_monitor_gunicorn(args), - should_setup_logging=True, - pid_file=monitor_pid_file, + setproctitle(f"airflow fastapi_api -- host:{args.hostname} port:{args.port}") + uvicorn.run( + "airflow.api_fastapi.main:app", + host=args.hostname, + port=args.port, + workers=num_workers, + timeout_keep_alive=worker_timeout, + timeout_graceful_shutdown=worker_timeout, + ssl_keyfile=ssl_key, + ssl_certfile=ssl_cert, + access_log=access_logfile, ) diff --git a/tests/cli/commands/local_commands/test_fastapi_api_command.py b/tests/cli/commands/local_commands/test_fastapi_api_command.py index ffa0b8c98b5..ea5339cfb3e 100644 --- a/tests/cli/commands/local_commands/test_fastapi_api_command.py +++ b/tests/cli/commands/local_commands/test_fastapi_api_command.py @@ -16,10 +16,6 @@ # under the License. from __future__ import annotations -import os -import subprocess -import sys -import time from unittest import mock import pytest @@ -37,73 +33,9 @@ console = Console(width=400, color_system="standard") class TestCliFastAPI(_CommonCLIGunicornTestClass): main_process_regexp = r"airflow fastapi-api" - @pytest.mark.execution_timeout(210) - def test_cli_fastapi_api_background(self, tmp_path): - parent_path = tmp_path / "gunicorn" - parent_path.mkdir() - pidfile_fastapi_api = parent_path / "pidflow-fastapi-api.pid" - pidfile_monitor = parent_path / "pidflow-fastapi-api-monitor.pid" - stdout = parent_path / "airflow-fastapi-api.out" - stderr = parent_path / "airflow-fastapi-api.err" - logfile = parent_path / "airflow-fastapi-api.log" - try: - # Run fastapi-api as daemon in background. Note that the wait method is not called. - console.print("[magenta]Starting airflow fastapi-api --daemon") - env = os.environ.copy() - proc = subprocess.Popen( - [ - "airflow", - "fastapi-api", - "--daemon", - "--pid", - os.fspath(pidfile_fastapi_api), - "--stdout", - os.fspath(stdout), - "--stderr", - os.fspath(stderr), - "--log-file", - os.fspath(logfile), - ], - env=env, - ) - assert proc.poll() is None - - pid_monitor = self._wait_pidfile(pidfile_monitor) - console.print(f"[blue]Monitor started at {pid_monitor}") - pid_fastapi_api = self._wait_pidfile(pidfile_fastapi_api) - console.print(f"[blue]FastAPI API started at {pid_fastapi_api}") - console.print("[blue]Running airflow fastapi-api process:") - # Assert that the fastapi-api and gunicorn processes are running (by name rather than pid). - assert self._find_process(r"airflow fastapi-api --daemon", print_found_process=True) - console.print("[blue]Waiting for gunicorn processes:") - # wait for gunicorn to start - for _ in range(30): - if self._find_process(r"^gunicorn"): - break - console.print("[blue]Waiting for gunicorn to start ...") - time.sleep(1) - console.print("[blue]Running gunicorn processes:") - assert self._find_all_processes("^gunicorn", print_found_process=True) - console.print("[magenta]fastapi-api process started successfully.") - console.print( - "[magenta]Terminating monitor process and expect " - "fastapi-api and gunicorn processes to terminate as well" - ) - self._terminate_multiple_process([pid_fastapi_api, pid_monitor]) - self._check_processes(ignore_running=False) - console.print("[magenta]All fastapi-api and gunicorn processes are terminated.") - except Exception: - console.print("[red]Exception occurred. Dumping all logs.") - # Dump all logs - for file in parent_path.glob("*"): - console.print(f"Dumping {file} (size: {file.stat().st_size})") - console.print(file.read_text()) - raise - def test_cli_fastapi_api_debug(self, app): with ( mock.patch("subprocess.Popen") as Popen, - mock.patch.object(fastapi_api_command, "GunicornMonitor"), ): port = "9092" hostname = "somehost" @@ -130,7 +62,6 @@ class TestCliFastAPI(_CommonCLIGunicornTestClass): """ with ( mock.patch("subprocess.Popen") as Popen, - mock.patch.object(fastapi_api_command, "GunicornMonitor"), mock.patch("os.environ", autospec=True) as mock_environ, ): apps_value = "core,execution" @@ -172,8 +103,7 @@ class TestCliFastAPI(_CommonCLIGunicornTestClass): cert_path, key_path = ssl_cert_and_key with ( - mock.patch("subprocess.Popen") as Popen, - mock.patch.object(fastapi_api_command, "GunicornMonitor"), + mock.patch("uvicorn.run") as mock_run, ): args = self.parser.parse_args( [ @@ -192,39 +122,16 @@ class TestCliFastAPI(_CommonCLIGunicornTestClass): ) fastapi_api_command.fastapi_api(args) - Popen.assert_called_with( - [ - sys.executable, - "-m", - "gunicorn", - "--workers", - "4", - "--worker-class", - "airflow.cli.commands.local_commands.fastapi_api_command.AirflowUvicornWorker", - "--timeout", - "120", - "--bind", - "0.0.0.0:9091", - "--name", - "airflow-fastapi-api", - "--pid", - "/tmp/x.pid", - "--access-logfile", - "-", - "--error-logfile", - "-", - "--config", - "python:airflow.api_fastapi.gunicorn_config", - "--certfile", - str(cert_path), - "--keyfile", - str(key_path), - "--access-logformat", - "custom_log_format", - "airflow.api_fastapi.app:cached_app(apps='core')", - "--preload", - ], - close_fds=True, + mock_run.assert_called_with( + "airflow.api_fastapi.main:app", + host="0.0.0.0", + port=9091, + workers=4, + timeout_keep_alive=120, + timeout_graceful_shutdown=120, + ssl_keyfile=str(key_path), + ssl_certfile=str(cert_path), + access_log="-", ) @pytest.mark.parametrize(