This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 933b0039f66 Replacing gunicornmontor with uvicorn.run() (#45103)
933b0039f66 is described below

commit 933b0039f6652937006d24796b6a5a69d24e850a
Author: vatsrahul1001 <43964496+vatsrahul1...@users.noreply.github.com>
AuthorDate: Fri Jan 24 14:33:08 2025 +0530

    Replacing gunicornmontor with uvicorn.run() (#45103)
    
    * replace gunicorm with uvicorn.run()
    
    * fixing tests
    
    * Daemonized fastapi server
    
    * fixing setproctitle format
    
    * updating setproctitle
---
 .../commands/local_commands/fastapi_api_command.py | 149 +++------------------
 .../local_commands/test_fastapi_api_command.py     | 115 ++--------------
 2 files changed, 33 insertions(+), 231 deletions(-)

diff --git a/airflow/cli/commands/local_commands/fastapi_api_command.py 
b/airflow/cli/commands/local_commands/fastapi_api_command.py
index e9fdd870916..cc66940648b 100644
--- a/airflow/cli/commands/local_commands/fastapi_api_command.py
+++ b/airflow/cli/commands/local_commands/fastapi_api_command.py
@@ -20,25 +20,16 @@ from __future__ import annotations
 
 import logging
 import os
-import signal
 import subprocess
-import sys
 import textwrap
-from contextlib import suppress
-from pathlib import Path
-from time import sleep
-from typing import NoReturn
 
-import psutil
-from lockfile.pidlockfile import read_pid_from_pidfile
-from uvicorn.workers import UvicornWorker
+import uvicorn
+from gunicorn.util import daemonize
+from setproctitle import setproctitle
 
 from airflow import settings
-from airflow.cli.commands.local_commands.daemon_utils import 
run_command_with_daemon_option
-from airflow.cli.commands.local_commands.webserver_command import 
GunicornMonitor
 from airflow.exceptions import AirflowConfigException
 from airflow.utils import cli as cli_utils
-from airflow.utils.cli import setup_locations
 from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 
 log = logging.getLogger(__name__)
@@ -47,8 +38,6 @@ log = logging.getLogger(__name__)
 # This shouldn't be necessary but there seems to be an issue in uvloop that 
causes bad file descriptor
 # errors when shutting down workers. Despite the 'closed' status of the issue 
it is not solved,
 # more info here: 
https://github.com/benoitc/gunicorn/issues/1877#issuecomment-1911136399
-AirflowUvicornWorker = UvicornWorker
-AirflowUvicornWorker.CONFIG_KWARGS = {"loop": "asyncio", "http": "auto"}
 
 
 @cli_utils.action_cli
@@ -59,18 +48,13 @@ def fastapi_api(args):
 
     apps = args.apps
     access_logfile = args.access_logfile or "-"
-    error_logfile = args.error_logfile or "-"
     access_logformat = args.access_logformat
     num_workers = args.workers
     worker_timeout = args.worker_timeout
 
-    worker_class = 
"airflow.cli.commands.local_commands.fastapi_api_command.AirflowUvicornWorker"
-
-    from airflow.api_fastapi.app import create_app
-
     if args.debug:
         print(f"Starting the FastAPI API server on port {args.port} and host 
{args.hostname} debug.")
-        log.warning("Running in dev mode, ignoring gunicorn args")
+        log.warning("Running in dev mode, ignoring uvicorn args")
 
         run_args = [
             "fastapi",
@@ -93,124 +77,35 @@ def fastapi_api(args):
             process.wait()
         os.environ.pop("AIRFLOW_API_APPS")
     else:
+        if args.daemon:
+            daemonize()
+            log.info("Daemonized the FastAPI API server process PID: %s", 
os.getpid())
+
         log.info(
             textwrap.dedent(
                 f"""\
-                Running the Gunicorn Server with:
+                Running the uvicorn with:
                 Apps: {apps}
-                Workers: {num_workers} {worker_class}
+                Workers: {num_workers}
                 Host: {args.hostname}:{args.port}
                 Timeout: {worker_timeout}
-                Logfiles: {access_logfile} {error_logfile}
+                Logfiles: {access_logfile}
                 Access Logformat: {access_logformat}
                 
================================================================="""
             )
         )
-
-        pid_file, _, _, _ = setup_locations("fastapi-api", pid=args.pid)
-
-        run_args = [
-            sys.executable,
-            "-m",
-            "gunicorn",
-            "--workers",
-            str(num_workers),
-            "--worker-class",
-            str(worker_class),
-            "--timeout",
-            str(worker_timeout),
-            "--bind",
-            args.hostname + ":" + str(args.port),
-            "--name",
-            "airflow-fastapi-api",
-            "--pid",
-            pid_file,
-            "--access-logfile",
-            str(access_logfile),
-            "--error-logfile",
-            str(error_logfile),
-            "--config",
-            "python:airflow.api_fastapi.gunicorn_config",
-        ]
-
         ssl_cert, ssl_key = _get_ssl_cert_and_key_filepaths(args)
-        if ssl_cert and ssl_key:
-            run_args += ["--certfile", ssl_cert, "--keyfile", ssl_key]
-
-        if args.access_logformat and args.access_logformat.strip():
-            run_args += ["--access-logformat", str(args.access_logformat)]
-
-        if args.daemon:
-            run_args += ["--daemon"]
-
-        run_args += [f"airflow.api_fastapi.app:cached_app(apps='{apps}')"]
-
-        # To prevent different workers creating the web app and
-        # all writing to the database at the same time, we use the --preload 
option.
-        # With the preload option, the app is loaded before the workers are 
forked, and each worker will
-        # then have a copy of the app
-        run_args += ["--preload"]
-
-        def kill_proc(signum: int, gunicorn_master_proc: psutil.Process | 
subprocess.Popen) -> NoReturn:
-            log.info("Received signal: %s. Closing gunicorn.", signum)
-            gunicorn_master_proc.terminate()
-            with suppress(TimeoutError):
-                gunicorn_master_proc.wait(timeout=30)
-            if isinstance(gunicorn_master_proc, subprocess.Popen):
-                still_running = gunicorn_master_proc.poll() is not None
-            else:
-                still_running = gunicorn_master_proc.is_running()
-            if still_running:
-                gunicorn_master_proc.kill()
-            sys.exit(0)
-
-        def monitor_gunicorn(gunicorn_master_proc: psutil.Process | 
subprocess.Popen) -> NoReturn:
-            # Register signal handlers
-            signal.signal(signal.SIGINT, lambda signum, _: kill_proc(signum, 
gunicorn_master_proc))
-            signal.signal(signal.SIGTERM, lambda signum, _: kill_proc(signum, 
gunicorn_master_proc))
-
-            # These run forever until SIG{INT, TERM, KILL, ...} signal is sent
-            GunicornMonitor(
-                gunicorn_master_pid=gunicorn_master_proc.pid,
-                num_workers_expected=num_workers,
-                master_timeout=120,
-                worker_refresh_interval=30,
-                worker_refresh_batch_size=1,
-                reload_on_plugin_change=False,
-            ).start()
-
-        def start_and_monitor_gunicorn(args):
-            if args.daemon:
-                subprocess.Popen(run_args, close_fds=True)
-
-                # Reading pid of gunicorn master as it will be different that
-                # the one of process spawned above.
-                gunicorn_master_proc_pid = None
-                while not gunicorn_master_proc_pid:
-                    sleep(0.1)
-                    gunicorn_master_proc_pid = read_pid_from_pidfile(pid_file)
-
-                # Run Gunicorn monitor
-                gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid)
-                monitor_gunicorn(gunicorn_master_proc)
-            else:
-                with subprocess.Popen(run_args, close_fds=True) as 
gunicorn_master_proc:
-                    monitor_gunicorn(gunicorn_master_proc)
-
-        if args.daemon:
-            # This makes possible errors get reported before daemonization
-            os.environ["SKIP_DAGS_PARSING"] = "True"
-            create_app(apps)
-            os.environ.pop("SKIP_DAGS_PARSING")
-
-        pid_file_path = Path(pid_file)
-        monitor_pid_file = 
str(pid_file_path.with_name(f"{pid_file_path.stem}-monitor{pid_file_path.suffix}"))
-        run_command_with_daemon_option(
-            args=args,
-            process_name="fastapi-api",
-            callback=lambda: start_and_monitor_gunicorn(args),
-            should_setup_logging=True,
-            pid_file=monitor_pid_file,
+        setproctitle(f"airflow fastapi_api -- host:{args.hostname} 
port:{args.port}")
+        uvicorn.run(
+            "airflow.api_fastapi.main:app",
+            host=args.hostname,
+            port=args.port,
+            workers=num_workers,
+            timeout_keep_alive=worker_timeout,
+            timeout_graceful_shutdown=worker_timeout,
+            ssl_keyfile=ssl_key,
+            ssl_certfile=ssl_cert,
+            access_log=access_logfile,
         )
 
 
diff --git a/tests/cli/commands/local_commands/test_fastapi_api_command.py 
b/tests/cli/commands/local_commands/test_fastapi_api_command.py
index ffa0b8c98b5..ea5339cfb3e 100644
--- a/tests/cli/commands/local_commands/test_fastapi_api_command.py
+++ b/tests/cli/commands/local_commands/test_fastapi_api_command.py
@@ -16,10 +16,6 @@
 # under the License.
 from __future__ import annotations
 
-import os
-import subprocess
-import sys
-import time
 from unittest import mock
 
 import pytest
@@ -37,73 +33,9 @@ console = Console(width=400, color_system="standard")
 class TestCliFastAPI(_CommonCLIGunicornTestClass):
     main_process_regexp = r"airflow fastapi-api"
 
-    @pytest.mark.execution_timeout(210)
-    def test_cli_fastapi_api_background(self, tmp_path):
-        parent_path = tmp_path / "gunicorn"
-        parent_path.mkdir()
-        pidfile_fastapi_api = parent_path / "pidflow-fastapi-api.pid"
-        pidfile_monitor = parent_path / "pidflow-fastapi-api-monitor.pid"
-        stdout = parent_path / "airflow-fastapi-api.out"
-        stderr = parent_path / "airflow-fastapi-api.err"
-        logfile = parent_path / "airflow-fastapi-api.log"
-        try:
-            # Run fastapi-api as daemon in background. Note that the wait 
method is not called.
-            console.print("[magenta]Starting airflow fastapi-api --daemon")
-            env = os.environ.copy()
-            proc = subprocess.Popen(
-                [
-                    "airflow",
-                    "fastapi-api",
-                    "--daemon",
-                    "--pid",
-                    os.fspath(pidfile_fastapi_api),
-                    "--stdout",
-                    os.fspath(stdout),
-                    "--stderr",
-                    os.fspath(stderr),
-                    "--log-file",
-                    os.fspath(logfile),
-                ],
-                env=env,
-            )
-            assert proc.poll() is None
-
-            pid_monitor = self._wait_pidfile(pidfile_monitor)
-            console.print(f"[blue]Monitor started at {pid_monitor}")
-            pid_fastapi_api = self._wait_pidfile(pidfile_fastapi_api)
-            console.print(f"[blue]FastAPI API started at {pid_fastapi_api}")
-            console.print("[blue]Running airflow fastapi-api process:")
-            # Assert that the fastapi-api and gunicorn processes are running 
(by name rather than pid).
-            assert self._find_process(r"airflow fastapi-api --daemon", 
print_found_process=True)
-            console.print("[blue]Waiting for gunicorn processes:")
-            # wait for gunicorn to start
-            for _ in range(30):
-                if self._find_process(r"^gunicorn"):
-                    break
-                console.print("[blue]Waiting for gunicorn to start ...")
-                time.sleep(1)
-            console.print("[blue]Running gunicorn processes:")
-            assert self._find_all_processes("^gunicorn", 
print_found_process=True)
-            console.print("[magenta]fastapi-api process started successfully.")
-            console.print(
-                "[magenta]Terminating monitor process and expect "
-                "fastapi-api and gunicorn processes to terminate as well"
-            )
-            self._terminate_multiple_process([pid_fastapi_api, pid_monitor])
-            self._check_processes(ignore_running=False)
-            console.print("[magenta]All fastapi-api and gunicorn processes are 
terminated.")
-        except Exception:
-            console.print("[red]Exception occurred. Dumping all logs.")
-            # Dump all logs
-            for file in parent_path.glob("*"):
-                console.print(f"Dumping {file} (size: {file.stat().st_size})")
-                console.print(file.read_text())
-            raise
-
     def test_cli_fastapi_api_debug(self, app):
         with (
             mock.patch("subprocess.Popen") as Popen,
-            mock.patch.object(fastapi_api_command, "GunicornMonitor"),
         ):
             port = "9092"
             hostname = "somehost"
@@ -130,7 +62,6 @@ class TestCliFastAPI(_CommonCLIGunicornTestClass):
         """
         with (
             mock.patch("subprocess.Popen") as Popen,
-            mock.patch.object(fastapi_api_command, "GunicornMonitor"),
             mock.patch("os.environ", autospec=True) as mock_environ,
         ):
             apps_value = "core,execution"
@@ -172,8 +103,7 @@ class TestCliFastAPI(_CommonCLIGunicornTestClass):
         cert_path, key_path = ssl_cert_and_key
 
         with (
-            mock.patch("subprocess.Popen") as Popen,
-            mock.patch.object(fastapi_api_command, "GunicornMonitor"),
+            mock.patch("uvicorn.run") as mock_run,
         ):
             args = self.parser.parse_args(
                 [
@@ -192,39 +122,16 @@ class TestCliFastAPI(_CommonCLIGunicornTestClass):
             )
             fastapi_api_command.fastapi_api(args)
 
-            Popen.assert_called_with(
-                [
-                    sys.executable,
-                    "-m",
-                    "gunicorn",
-                    "--workers",
-                    "4",
-                    "--worker-class",
-                    
"airflow.cli.commands.local_commands.fastapi_api_command.AirflowUvicornWorker",
-                    "--timeout",
-                    "120",
-                    "--bind",
-                    "0.0.0.0:9091",
-                    "--name",
-                    "airflow-fastapi-api",
-                    "--pid",
-                    "/tmp/x.pid",
-                    "--access-logfile",
-                    "-",
-                    "--error-logfile",
-                    "-",
-                    "--config",
-                    "python:airflow.api_fastapi.gunicorn_config",
-                    "--certfile",
-                    str(cert_path),
-                    "--keyfile",
-                    str(key_path),
-                    "--access-logformat",
-                    "custom_log_format",
-                    "airflow.api_fastapi.app:cached_app(apps='core')",
-                    "--preload",
-                ],
-                close_fds=True,
+            mock_run.assert_called_with(
+                "airflow.api_fastapi.main:app",
+                host="0.0.0.0",
+                port=9091,
+                workers=4,
+                timeout_keep_alive=120,
+                timeout_graceful_shutdown=120,
+                ssl_keyfile=str(key_path),
+                ssl_certfile=str(cert_path),
+                access_log="-",
             )
 
     @pytest.mark.parametrize(

Reply via email to