This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ae2c59adbea Remove orphaned celery_command file (#46879)
ae2c59adbea is described below
commit ae2c59adbeae687dd52835d24e92da5110cebbd5
Author: Jed Cunningham <[email protected]>
AuthorDate: Thu Feb 20 12:23:04 2025 -0700
Remove orphaned celery_command file (#46879)
* Remove orphaned celery_command file
We don't need this for backcompat any longer - all celery commands are
coming from the provider now - and this isn't even the historical
location of this file :)
* fixup
---
.../cli/commands/local_commands/celery_command.py | 245 --------------
scripts/cov/cli_coverage.py | 1 -
.../commands/local_commands/test_celery_command.py | 361 ---------------------
.../cli/commands/test_celery_command.py | 6 +-
4 files changed, 3 insertions(+), 610 deletions(-)
diff --git a/airflow/cli/commands/local_commands/celery_command.py
b/airflow/cli/commands/local_commands/celery_command.py
deleted file mode 100644
index 034f33e2439..00000000000
--- a/airflow/cli/commands/local_commands/celery_command.py
+++ /dev/null
@@ -1,245 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-# DO NOT MODIFY THIS FILE unless it is a serious bugfix - all the new celery
commands should be added in celery provider.
-# This file is kept for backward compatibility only.
-"""Celery command."""
-
-from __future__ import annotations
-
-import logging
-import sys
-import warnings
-from contextlib import contextmanager
-from multiprocessing import Process
-
-import psutil
-import sqlalchemy.exc
-from celery import maybe_patch_concurrency # type: ignore[attr-defined]
-from celery.app.defaults import DEFAULT_TASK_LOG_FMT
-from celery.signals import after_setup_logger
-from lockfile.pidlockfile import read_pid_from_pidfile, remove_existing_pidfile
-
-from airflow import settings
-from airflow.cli.commands.local_commands.daemon_utils import
run_command_with_daemon_option
-from airflow.configuration import conf
-from airflow.utils import cli as cli_utils
-from airflow.utils.cli import setup_locations
-from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
-from airflow.utils.serve_logs import serve_logs
-
-WORKER_PROCESS_NAME = "worker"
-
-warnings.warn(
- "Use celery command from providers package, Use celery provider >= 3.6.1",
- DeprecationWarning,
- stacklevel=2,
-)
-
-
-@cli_utils.action_cli
-@providers_configuration_loaded
-def flower(args):
- """Start Flower, Celery monitoring tool."""
- # This needs to be imported locally to not trigger Providers Manager
initialization
- from airflow.providers.celery.executors.celery_executor import app as
celery_app
-
- options = [
- "flower",
- conf.get("celery", "BROKER_URL"),
- f"--address={args.hostname}",
- f"--port={args.port}",
- ]
-
- if args.broker_api:
- options.append(f"--broker-api={args.broker_api}")
-
- if args.url_prefix:
- options.append(f"--url-prefix={args.url_prefix}")
-
- if args.basic_auth:
- options.append(f"--basic-auth={args.basic_auth}")
-
- if args.flower_conf:
- options.append(f"--conf={args.flower_conf}")
-
- run_command_with_daemon_option(
- args=args, process_name="flower", callback=lambda:
celery_app.start(options)
- )
-
-
-@contextmanager
-def _serve_logs(skip_serve_logs: bool = False):
- """Start serve_logs sub-process."""
- sub_proc = None
- if skip_serve_logs is False:
- sub_proc = Process(target=serve_logs)
- sub_proc.start()
- try:
- yield
- finally:
- if sub_proc:
- sub_proc.terminate()
-
-
-@after_setup_logger.connect()
-@providers_configuration_loaded
-def logger_setup_handler(logger, **kwargs):
- """
- Reconfigure the logger.
-
- * remove any previously configured handlers
- * logs of severity error, and above goes to stderr,
- * logs of severity lower than error goes to stdout.
- """
- if conf.getboolean("logging", "celery_stdout_stderr_separation",
fallback=False):
- celery_formatter = logging.Formatter(DEFAULT_TASK_LOG_FMT)
-
- class NoErrorOrAboveFilter(logging.Filter):
- """Allow only logs with level *lower* than ERROR to be reported."""
-
- def filter(self, record):
- return record.levelno < logging.ERROR
-
- below_error_handler = logging.StreamHandler(sys.stdout)
- below_error_handler.addFilter(NoErrorOrAboveFilter())
- below_error_handler.setFormatter(celery_formatter)
-
- from_error_handler = logging.StreamHandler(sys.stderr)
- from_error_handler.setLevel(logging.ERROR)
- from_error_handler.setFormatter(celery_formatter)
-
- logger.handlers[:] = [below_error_handler, from_error_handler]
-
-
-@cli_utils.action_cli
-@providers_configuration_loaded
-def worker(args):
- """Start Airflow Celery worker."""
- # This needs to be imported locally to not trigger Providers Manager
initialization
- from airflow.providers.celery.executors.celery_executor import app as
celery_app
-
- # Disable connection pool so that celery worker does not hold an
unnecessary db connection
- settings.reconfigure_orm(disable_connection_pool=True)
- if not settings.validate_session():
- raise SystemExit("Worker exiting, database connection precheck
failed.")
-
- autoscale = args.autoscale
- skip_serve_logs = args.skip_serve_logs
-
- if autoscale is None and conf.has_option("celery", "worker_autoscale"):
- autoscale = conf.get("celery", "worker_autoscale")
-
- if hasattr(celery_app.backend, "ResultSession"):
- # Pre-create the database tables now, otherwise SQLA via Celery has a
- # race condition where one of the subprocesses can die with "Table
- # already exists" error, because SQLA checks for which tables exist,
- # then issues a CREATE TABLE, rather than doing CREATE TABLE IF NOT
- # EXISTS
- try:
- session = celery_app.backend.ResultSession()
- session.close()
- except sqlalchemy.exc.IntegrityError:
- # At least on postgres, trying to create a table that already exist
- # gives a unique constraint violation or the
- # "pg_type_typname_nsp_index" table. If this happens we can ignore
- # it, we raced to create the tables and lost.
- pass
-
- # backwards-compatible:
https://github.com/apache/airflow/pull/21506#pullrequestreview-879893763
- celery_log_level = conf.get("logging", "CELERY_LOGGING_LEVEL")
- if not celery_log_level:
- celery_log_level = conf.get("logging", "LOGGING_LEVEL")
-
- # Setup Celery worker
- options = [
- "worker",
- "-O",
- "fair",
- "--queues",
- args.queues,
- "--concurrency",
- args.concurrency,
- "--hostname",
- args.celery_hostname,
- "--loglevel",
- celery_log_level,
- ]
- if autoscale:
- options.extend(["--autoscale", autoscale])
- if args.without_mingle:
- options.append("--without-mingle")
- if args.without_gossip:
- options.append("--without-gossip")
-
- if conf.has_option("celery", "pool"):
- pool = conf.get("celery", "pool")
- options.extend(["--pool", pool])
- # Celery pools of type eventlet and gevent use greenlets, which
- # requires monkey patching the app:
- # https://eventlet.net/doc/patching.html#monkey-patch
- # Otherwise task instances hang on the workers and are never
- # executed.
- maybe_patch_concurrency(["-P", pool])
-
- worker_pid_file_path, stdout, stderr, log_file = setup_locations(
- process=WORKER_PROCESS_NAME,
- stdout=args.stdout,
- stderr=args.stderr,
- log=args.log_file,
- pid=args.pid,
- )
-
- def run_celery_worker():
- with _serve_logs(skip_serve_logs):
- celery_app.worker_main(options)
-
- if args.umask:
- umask = args.umask
- else:
- umask = conf.get("celery", "worker_umask",
fallback=settings.DAEMON_UMASK)
-
- run_command_with_daemon_option(
- args=args,
- process_name=WORKER_PROCESS_NAME,
- callback=run_celery_worker,
- should_setup_logging=True,
- umask=umask,
- pid_file=worker_pid_file_path,
- )
-
-
-@cli_utils.action_cli
-@providers_configuration_loaded
-def stop_worker(args):
- """Send SIGTERM to Celery worker."""
- # Read PID from file
- if args.pid:
- pid_file_path = args.pid
- else:
- pid_file_path, _, _, _ = setup_locations(process=WORKER_PROCESS_NAME)
- pid = read_pid_from_pidfile(pid_file_path)
-
- # Send SIGTERM
- if pid:
- worker_process = psutil.Process(pid)
- worker_process.terminate()
-
- # Remove pid file
- remove_existing_pidfile(pid_file_path)
diff --git a/scripts/cov/cli_coverage.py b/scripts/cov/cli_coverage.py
index b8b37423f7a..08caf1f01d0 100644
--- a/scripts/cov/cli_coverage.py
+++ b/scripts/cov/cli_coverage.py
@@ -30,7 +30,6 @@ cli_files = ["tests/cli"]
files_not_fully_covered = [
"airflow/cli/cli_config.py",
"airflow/cli/cli_parser.py",
- "airflow/cli/commands/local_commands/celery_command.py",
"airflow/cli/commands/remote_commands/config_command.py",
"airflow/cli/commands/remote_commands/connection_command.py",
"airflow/cli/commands/remote_commands/dag_command.py",
diff --git a/tests/cli/commands/local_commands/test_celery_command.py
b/tests/cli/commands/local_commands/test_celery_command.py
deleted file mode 100644
index da7f770e151..00000000000
--- a/tests/cli/commands/local_commands/test_celery_command.py
+++ /dev/null
@@ -1,361 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import importlib
-import os
-from argparse import Namespace
-from unittest import mock
-
-import pytest
-import sqlalchemy
-
-import airflow
-from airflow.cli import cli_parser
-from airflow.cli.commands.local_commands import celery_command
-from airflow.configuration import conf
-from airflow.executors import executor_loader
-
-from tests_common.test_utils.config import conf_vars
-
-pytestmark = pytest.mark.db_test
-
-
-class TestWorkerPrecheck:
- @mock.patch("airflow.settings.validate_session")
- def test_error(self, mock_validate_session):
- """
- Test to verify the exit mechanism of airflow-worker cli
- by mocking validate_session method
- """
- mock_validate_session.return_value = False
- with pytest.raises(SystemExit) as ctx, conf_vars({("core",
"executor"): "CeleryExecutor"}):
- celery_command.worker(Namespace(queues=1, concurrency=1))
- assert str(ctx.value) == "Worker exiting, database connection precheck
failed."
-
- @conf_vars({("celery", "worker_precheck"): "False"})
- def test_worker_precheck_exception(self):
- """
- Test to check the behaviour of validate_session method
- when worker_precheck is absent in airflow configuration
- """
- assert airflow.settings.validate_session()
-
- @mock.patch("sqlalchemy.orm.session.Session.execute")
- @conf_vars({("celery", "worker_precheck"): "True"})
- def test_validate_session_dbapi_exception(self, mock_session):
- """
- Test to validate connection failure scenario on SELECT 1 query
- """
- mock_session.side_effect = sqlalchemy.exc.OperationalError("m1", "m2",
"m3", "m4")
- assert airflow.settings.validate_session() is False
-
-
[email protected]("mysql", "postgres")
-class TestCeleryStopCommand:
- @classmethod
- def setup_class(cls):
- with conf_vars({("core", "executor"): "CeleryExecutor"}):
- importlib.reload(executor_loader)
- importlib.reload(cli_parser)
- cls.parser = cli_parser.get_parser()
-
-
@mock.patch("airflow.cli.commands.local_commands.celery_command.setup_locations")
-
@mock.patch("airflow.cli.commands.local_commands.celery_command.psutil.Process")
- def test_if_right_pid_is_read(self, mock_process, mock_setup_locations,
tmp_path):
- args = self.parser.parse_args(["celery", "stop"])
- pid = "123"
- path = tmp_path / "testfile"
- # Create pid file
- path.write_text(pid)
- # Setup mock
- mock_setup_locations.return_value = (os.fspath(path), None, None, None)
-
- # Calling stop_worker should delete the temporary pid file
- celery_command.stop_worker(args)
- # Check if works as expected
- assert not path.exists()
- mock_process.assert_called_once_with(int(pid))
- mock_process.return_value.terminate.assert_called_once_with()
-
-
@mock.patch("airflow.cli.commands.local_commands.celery_command.read_pid_from_pidfile")
- @mock.patch("airflow.providers.celery.executors.celery_executor.app")
-
@mock.patch("airflow.cli.commands.local_commands.celery_command.setup_locations")
- def test_same_pid_file_is_used_in_start_and_stop(
- self, mock_setup_locations, mock_celery_app, mock_read_pid_from_pidfile
- ):
- pid_file = "test_pid_file"
- mock_setup_locations.return_value = (pid_file, None, None, None)
- mock_read_pid_from_pidfile.return_value = None
-
- # Call worker
- worker_args = self.parser.parse_args(["celery", "worker",
"--skip-serve-logs"])
- celery_command.worker(worker_args)
- assert mock_celery_app.worker_main.call_args
- args, _ = mock_celery_app.worker_main.call_args
- args_str = " ".join(map(str, args[0]))
- assert f"--pidfile {pid_file}" not in args_str
-
- # Call stop
- stop_args = self.parser.parse_args(["celery", "stop"])
- celery_command.stop_worker(stop_args)
- mock_read_pid_from_pidfile.assert_called_once_with(pid_file)
-
-
@mock.patch("airflow.cli.commands.local_commands.celery_command.remove_existing_pidfile")
-
@mock.patch("airflow.cli.commands.local_commands.celery_command.read_pid_from_pidfile")
- @mock.patch("airflow.providers.celery.executors.celery_executor.app")
-
@mock.patch("airflow.cli.commands.local_commands.celery_command.psutil.Process")
-
@mock.patch("airflow.cli.commands.local_commands.celery_command.setup_locations")
- def test_custom_pid_file_is_used_in_start_and_stop(
- self,
- mock_setup_locations,
- mock_process,
- mock_celery_app,
- mock_read_pid_from_pidfile,
- mock_remove_existing_pidfile,
- ):
- pid_file = "custom_test_pid_file"
- mock_setup_locations.return_value = (pid_file, None, None, None)
- # Call worker
- worker_args = self.parser.parse_args(["celery", "worker",
"--skip-serve-logs", "--pid", pid_file])
- celery_command.worker(worker_args)
- assert mock_celery_app.worker_main.call_args
- args, _ = mock_celery_app.worker_main.call_args
- args_str = " ".join(map(str, args[0]))
- assert f"--pidfile {pid_file}" not in args_str
-
- stop_args = self.parser.parse_args(["celery", "stop", "--pid",
pid_file])
- celery_command.stop_worker(stop_args)
-
- mock_read_pid_from_pidfile.assert_called_once_with(pid_file)
- mock_process.return_value.terminate.assert_called()
- mock_remove_existing_pidfile.assert_called_once_with(pid_file)
-
-
[email protected]("mysql", "postgres")
-class TestWorkerStart:
- @classmethod
- def setup_class(cls):
- with conf_vars({("core", "executor"): "CeleryExecutor"}):
- importlib.reload(executor_loader)
- importlib.reload(cli_parser)
- cls.parser = cli_parser.get_parser()
-
-
@mock.patch("airflow.cli.commands.local_commands.celery_command.setup_locations")
- @mock.patch("airflow.cli.commands.local_commands.celery_command.Process")
- @mock.patch("airflow.providers.celery.executors.celery_executor.app")
- def test_worker_started_with_required_arguments(self, mock_celery_app,
mock_popen, mock_locations):
- pid_file = "pid_file"
- mock_locations.return_value = (pid_file, None, None, None)
- concurrency = "1"
- celery_hostname = "celery_hostname"
- queues = "queue"
- autoscale = "2,5"
- args = self.parser.parse_args(
- [
- "celery",
- "worker",
- "--autoscale",
- autoscale,
- "--concurrency",
- concurrency,
- "--celery-hostname",
- celery_hostname,
- "--queues",
- queues,
- "--without-mingle",
- "--without-gossip",
- ]
- )
-
- celery_command.worker(args)
-
- mock_celery_app.worker_main.assert_called_once_with(
- [
- "worker",
- "-O",
- "fair",
- "--queues",
- queues,
- "--concurrency",
- int(concurrency),
- "--hostname",
- celery_hostname,
- "--loglevel",
- conf.get("logging", "CELERY_LOGGING_LEVEL"),
- "--autoscale",
- autoscale,
- "--without-mingle",
- "--without-gossip",
- "--pool",
- "prefork",
- ]
- )
-
-
[email protected]("mysql", "postgres")
-class TestWorkerFailure:
- @classmethod
- def setup_class(cls):
- with conf_vars({("core", "executor"): "CeleryExecutor"}):
- importlib.reload(executor_loader)
- importlib.reload(cli_parser)
- cls.parser = cli_parser.get_parser()
-
- @mock.patch("airflow.cli.commands.local_commands.celery_command.Process")
- @mock.patch("airflow.providers.celery.executors.celery_executor.app")
- def test_worker_failure_gracefull_shutdown(self, mock_celery_app,
mock_popen):
- args = self.parser.parse_args(["celery", "worker"])
- mock_celery_app.run.side_effect = Exception("Mock exception to trigger
runtime error")
- try:
- celery_command.worker(args)
- finally:
- mock_popen().terminate.assert_called()
-
-
[email protected]("mysql", "postgres")
-class TestFlowerCommand:
- @classmethod
- def setup_class(cls):
- with conf_vars({("core", "executor"): "CeleryExecutor"}):
- importlib.reload(executor_loader)
- importlib.reload(cli_parser)
- cls.parser = cli_parser.get_parser()
-
- @mock.patch("airflow.providers.celery.executors.celery_executor.app")
- def test_run_command(self, mock_celery_app):
- args = self.parser.parse_args(
- [
- "celery",
- "flower",
- "--basic-auth",
- "admin:admin",
- "--broker-api",
- "http://username:password@rabbitmq-server-name:15672/api/",
- "--flower-conf",
- "flower_config",
- "--hostname",
- "my-hostname",
- "--port",
- "3333",
- "--url-prefix",
- "flower-monitoring",
- ]
- )
-
- celery_command.flower(args)
- mock_celery_app.start.assert_called_once_with(
- [
- "flower",
- conf.get("celery", "BROKER_URL"),
- "--address=my-hostname",
- "--port=3333",
-
"--broker-api=http://username:password@rabbitmq-server-name:15672/api/",
- "--url-prefix=flower-monitoring",
- "--basic-auth=admin:admin",
- "--conf=flower_config",
- ]
- )
-
-
@mock.patch("airflow.cli.commands.local_commands.daemon_utils.TimeoutPIDLockFile")
-
@mock.patch("airflow.cli.commands.local_commands.daemon_utils.setup_locations")
- @mock.patch("airflow.cli.commands.local_commands.daemon_utils.daemon")
- @mock.patch("airflow.providers.celery.executors.celery_executor.app")
- @pytest.mark.usefixtures("capfd") # This test needs fd capturing to work
- def test_run_command_daemon(self, mock_celery_app, mock_daemon,
mock_setup_locations, mock_pid_file):
- mock_setup_locations.return_value = (
- mock.MagicMock(name="pidfile"),
- mock.MagicMock(name="stdout"),
- mock.MagicMock(name="stderr"),
- mock.MagicMock(name="INVALID"),
- )
- args = self.parser.parse_args(
- [
- "celery",
- "flower",
- "--basic-auth",
- "admin:admin",
- "--broker-api",
- "http://username:password@rabbitmq-server-name:15672/api/",
- "--flower-conf",
- "flower_config",
- "--hostname",
- "my-hostname",
- "--log-file",
- "/tmp/flower.log",
- "--pid",
- "/tmp/flower.pid",
- "--port",
- "3333",
- "--stderr",
- "/tmp/flower-stderr.log",
- "--stdout",
- "/tmp/flower-stdout.log",
- "--url-prefix",
- "flower-monitoring",
- "--daemon",
- ]
- )
- mock_open = mock.mock_open()
- with
mock.patch("airflow.cli.commands.local_commands.daemon_utils.open", mock_open):
- celery_command.flower(args)
-
- mock_celery_app.start.assert_called_once_with(
- [
- "flower",
- conf.get("celery", "BROKER_URL"),
- "--address=my-hostname",
- "--port=3333",
-
"--broker-api=http://username:password@rabbitmq-server-name:15672/api/",
- "--url-prefix=flower-monitoring",
- "--basic-auth=admin:admin",
- "--conf=flower_config",
- ]
- )
- assert mock_daemon.mock_calls[:3] == [
- mock.call.DaemonContext(
- pidfile=mock_pid_file.return_value,
- files_preserve=None,
- stdout=mock_open.return_value,
- stderr=mock_open.return_value,
- umask=0o077,
- ),
- mock.call.DaemonContext().__enter__(),
- mock.call.DaemonContext().__exit__(None, None, None),
- ]
-
- assert mock_setup_locations.mock_calls == [
- mock.call(
- process="flower",
- pid="/tmp/flower.pid",
- stdout="/tmp/flower-stdout.log",
- stderr="/tmp/flower-stderr.log",
- log="/tmp/flower.log",
- )
- ]
-
mock_pid_file.assert_has_calls([mock.call(mock_setup_locations.return_value[0],
-1)])
- assert mock_open.mock_calls == [
- mock.call(mock_setup_locations.return_value[1], "a"),
- mock.call().__enter__(),
- mock.call(mock_setup_locations.return_value[2], "a"),
- mock.call().__enter__(),
- mock.call().truncate(0),
- mock.call().truncate(0),
- mock.call().__exit__(None, None, None),
- mock.call().__exit__(None, None, None),
- ]
diff --git a/tests/integration/cli/commands/test_celery_command.py
b/tests/integration/cli/commands/test_celery_command.py
index 14115166255..db798ba8b71 100644
--- a/tests/integration/cli/commands/test_celery_command.py
+++ b/tests/integration/cli/commands/test_celery_command.py
@@ -23,8 +23,8 @@ from unittest import mock
import pytest
from airflow.cli import cli_parser
-from airflow.cli.commands.local_commands import celery_command
from airflow.executors import executor_loader
+from airflow.providers.celery.cli import celery_command
from tests_common.test_utils.config import conf_vars
@@ -44,7 +44,7 @@ class TestWorkerServeLogs:
@conf_vars({("core", "executor"): "CeleryExecutor"})
def test_serve_logs_on_worker_start(self):
with (
-
mock.patch("airflow.cli.commands.local_commands.celery_command.Process") as
mock_process,
+ mock.patch("airflow.providers.celery.cli.celery_command.Process")
as mock_process,
mock.patch("airflow.providers.celery.executors.celery_executor.app"),
):
args = self.parser.parse_args(["celery", "worker",
"--concurrency", "1"])
@@ -57,7 +57,7 @@ class TestWorkerServeLogs:
@conf_vars({("core", "executor"): "CeleryExecutor"})
def test_skip_serve_logs_on_worker_start(self):
with (
-
mock.patch("airflow.cli.commands.local_commands.celery_command.Process") as
mock_popen,
+ mock.patch("airflow.providers.celery.cli.celery_command.Process")
as mock_popen,
mock.patch("airflow.providers.celery.executors.celery_executor.app"),
):
args = self.parser.parse_args(["celery", "worker",
"--concurrency", "1", "--skip-serve-logs"])