uranusjr commented on code in PR #45103:
URL: https://github.com/apache/airflow/pull/45103#discussion_r1923168350
##########
airflow/cli/commands/local_commands/fastapi_api_command.py:
##########
@@ -96,121 +78,27 @@ def fastapi_api(args):
log.info(
textwrap.dedent(
f"""\
- Running the Gunicorn Server with:
+ Running the uvicorn with:
Apps: {apps}
- Workers: {num_workers} {worker_class}
+ Workers: {num_workers}
Host: {args.hostname}:{args.port}
Timeout: {worker_timeout}
- Logfiles: {access_logfile} {error_logfile}
+ Logfiles: {access_logfile}
Access Logformat: {access_logformat}
================================================================="""
)
)
-
- pid_file, _, _, _ = setup_locations("fastapi-api", pid=args.pid)
-
- run_args = [
- sys.executable,
- "-m",
- "gunicorn",
- "--workers",
- str(num_workers),
- "--worker-class",
- str(worker_class),
- "--timeout",
- str(worker_timeout),
- "--bind",
- args.hostname + ":" + str(args.port),
- "--name",
- "airflow-fastapi-api",
- "--pid",
- pid_file,
- "--access-logfile",
- str(access_logfile),
- "--error-logfile",
- str(error_logfile),
- "--config",
- "python:airflow.api_fastapi.gunicorn_config",
- ]
-
ssl_cert, ssl_key = _get_ssl_cert_and_key_filepaths(args)
- if ssl_cert and ssl_key:
- run_args += ["--certfile", ssl_cert, "--keyfile", ssl_key]
-
- if args.access_logformat and args.access_logformat.strip():
- run_args += ["--access-logformat", str(args.access_logformat)]
-
- if args.daemon:
- run_args += ["--daemon"]
-
- run_args += [f"airflow.api_fastapi.app:cached_app(apps='{apps}')"]
-
- # To prevent different workers creating the web app and
- # all writing to the database at the same time, we use the --preload
option.
- # With the preload option, the app is loaded before the workers are
forked, and each worker will
- # then have a copy of the app
- run_args += ["--preload"]
-
- def kill_proc(signum: int, gunicorn_master_proc: psutil.Process |
subprocess.Popen) -> NoReturn:
- log.info("Received signal: %s. Closing gunicorn.", signum)
- gunicorn_master_proc.terminate()
- with suppress(TimeoutError):
- gunicorn_master_proc.wait(timeout=30)
- if isinstance(gunicorn_master_proc, subprocess.Popen):
- still_running = gunicorn_master_proc.poll() is not None
- else:
- still_running = gunicorn_master_proc.is_running()
- if still_running:
- gunicorn_master_proc.kill()
- sys.exit(0)
-
- def monitor_gunicorn(gunicorn_master_proc: psutil.Process |
subprocess.Popen) -> NoReturn:
- # Register signal handlers
- signal.signal(signal.SIGINT, lambda signum, _: kill_proc(signum,
gunicorn_master_proc))
- signal.signal(signal.SIGTERM, lambda signum, _: kill_proc(signum,
gunicorn_master_proc))
-
- # These run forever until SIG{INT, TERM, KILL, ...} signal is sent
- GunicornMonitor(
- gunicorn_master_pid=gunicorn_master_proc.pid,
- num_workers_expected=num_workers,
- master_timeout=120,
- worker_refresh_interval=30,
- worker_refresh_batch_size=1,
- reload_on_plugin_change=False,
- ).start()
-
- def start_and_monitor_gunicorn(args):
- if args.daemon:
- subprocess.Popen(run_args, close_fds=True)
-
- # Reading pid of gunicorn master as it will be different that
- # the one of process spawned above.
- gunicorn_master_proc_pid = None
- while not gunicorn_master_proc_pid:
- sleep(0.1)
- gunicorn_master_proc_pid = read_pid_from_pidfile(pid_file)
-
- # Run Gunicorn monitor
- gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid)
- monitor_gunicorn(gunicorn_master_proc)
- else:
- with subprocess.Popen(run_args, close_fds=True) as
gunicorn_master_proc:
- monitor_gunicorn(gunicorn_master_proc)
-
- if args.daemon:
- # This makes possible errors get reported before daemonization
- os.environ["SKIP_DAGS_PARSING"] = "True"
- create_app(apps)
- os.environ.pop("SKIP_DAGS_PARSING")
-
- pid_file_path = Path(pid_file)
- monitor_pid_file =
str(pid_file_path.with_name(f"{pid_file_path.stem}-monitor{pid_file_path.suffix}"))
- run_command_with_daemon_option(
- args=args,
- process_name="fastapi-api",
- callback=lambda: start_and_monitor_gunicorn(args),
- should_setup_logging=True,
- pid_file=monitor_pid_file,
+ uvicorn.run(
+ "airflow.api_fastapi.main:app",
+ host=args.hostname,
+ port=args.port,
+ workers=num_workers,
+ timeout_keep_alive=worker_timeout,
+ timeout_graceful_shutdown=worker_timeout,
+ ssl_keyfile=ssl_key,
+ ssl_certfile=ssl_cert,
+ access_log=access_logfile,
Review Comment:
We should redo the setproctitle part if possible too, it’s useful for
identifying processes to kill. (Not sure how we can do it for uvicorn workers.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]