This is an automated email from the ASF dual-hosted git repository. tn pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tooling-trusted-release.git
The following commit(s) were added to refs/heads/main by this push: new f635451 simplify db initalization for app / worker, reduce amount of logging in worker, increate sleep in worker loop to reduce cpu usage f635451 is described below commit f6354518c75e33d2623bebd1e4df64d7bc907778 Author: Thomas Neidhart <t...@apache.org> AuthorDate: Wed Mar 12 22:02:53 2025 +0100 simplify db initalization for app / worker, reduce amount of logging in worker, increate sleep in worker loop to reduce cpu usage --- atr/db/__init__.py | 69 +++++++++++++++++++++++++++++++++--------------------- atr/server.py | 2 +- atr/worker.py | 6 ++--- 3 files changed, 46 insertions(+), 31 deletions(-) diff --git a/atr/db/__init__.py b/atr/db/__init__.py index 8209976..b9f8b9d 100644 --- a/atr/db/__init__.py +++ b/atr/db/__init__.py @@ -27,30 +27,30 @@ import sqlalchemy.orm as orm import sqlalchemy.sql as sql import sqlmodel +import atr.config import atr.util as util from asfquart.base import QuartApp +from atr.config import AppConfig _LOGGER: Final = logging.getLogger(__name__) +_global_async_sessionmaker: sqlalchemy.ext.asyncio.async_sessionmaker | None = None _global_sync_engine: sqlalchemy.Engine | None = None -def create_database(app: QuartApp) -> None: +def init_database(app: QuartApp) -> None: + """ + Creates and initializes the database for a QuartApp. + + The database is created and an AsyncSession is registered as extension for the app. + Any pending migrations are executed. + """ + @app.before_serving async def create() -> None: - project_root = app.config["PROJECT_ROOT"] - sqlite_db_path = app.config["SQLITE_DB_PATH"] - sqlite_url = f"sqlite+aiosqlite://{sqlite_db_path}" - # Use aiosqlite for async SQLite access - engine = sqlalchemy.ext.asyncio.create_async_engine( - sqlite_url, - connect_args={ - "check_same_thread": False, - "timeout": 30, - }, - ) + app_config = atr.config.get() + engine = create_async_engine(app_config) - # Create async session factory app.extensions["async_session"] = sqlalchemy.ext.asyncio.async_sessionmaker( bind=engine, class_=sqlalchemy.ext.asyncio.AsyncSession, expire_on_commit=False ) @@ -68,13 +68,14 @@ def create_database(app: QuartApp) -> None: # In dev we'd do this first: # poetry run alembic revision --autogenerate -m "description" # Then review the generated migration in migrations/versions/ and commit it + project_root = app_config.PROJECT_ROOT alembic_ini_path = os.path.join(project_root, "alembic.ini") alembic_cfg = config.Config(alembic_ini_path) # Override the migrations directory location to use project root # TODO: Is it possible to set this in alembic.ini? alembic_cfg.set_main_option("script_location", os.path.join(project_root, "migrations")) # Set the database URL in the config - alembic_cfg.set_main_option("sqlalchemy.url", sqlite_url) + alembic_cfg.set_main_option("sqlalchemy.url", str(engine.url)) # command.upgrade(alembic_cfg, "head") # Create any tables that might be missing @@ -82,16 +83,19 @@ def create_database(app: QuartApp) -> None: await conn.run_sync(sqlmodel.SQLModel.metadata.create_all) -def create_async_db_session() -> sqlalchemy.ext.asyncio.AsyncSession: - """Create a new asynchronous database session.""" - if quart.has_app_context(): - extensions = quart.current_app.extensions - return util.validate_as_type(extensions["async_session"](), sqlalchemy.ext.asyncio.AsyncSession) +def init_database_for_worker() -> None: + global _global_async_sessionmaker + + _LOGGER.info(f"Creating database for worker {os.getpid()}") + engine = create_async_engine(atr.config.get()) + _global_async_sessionmaker = sqlalchemy.ext.asyncio.async_sessionmaker( + bind=engine, class_=sqlalchemy.ext.asyncio.AsyncSession, expire_on_commit=False + ) - import atr.config as config - conf = config.get() - sqlite_url = f"sqlite+aiosqlite://{conf.SQLITE_DB_PATH}" +def create_async_engine(app_config: type[AppConfig]) -> sqlalchemy.ext.asyncio.AsyncEngine: + sqlite_url = f"sqlite+aiosqlite://{app_config.SQLITE_DB_PATH}" + # Use aiosqlite for async SQLite access engine = sqlalchemy.ext.asyncio.create_async_engine( sqlite_url, connect_args={ @@ -99,23 +103,34 @@ def create_async_db_session() -> sqlalchemy.ext.asyncio.AsyncSession: "timeout": 30, }, ) - return sqlalchemy.ext.asyncio.async_sessionmaker( - bind=engine, class_=sqlalchemy.ext.asyncio.AsyncSession, expire_on_commit=False - )() + + return engine + + +def create_async_db_session() -> sqlalchemy.ext.asyncio.AsyncSession: + """Create a new asynchronous database session.""" + if quart.has_app_context(): + extensions = quart.current_app.extensions + return util.validate_as_type(extensions["async_session"](), sqlalchemy.ext.asyncio.AsyncSession) + else: + if _global_async_sessionmaker is None: + raise RuntimeError("Global async_sessionmaker not initialized, run init_database() first.") + return util.validate_as_type(_global_async_sessionmaker(), sqlalchemy.ext.asyncio.AsyncSession) +# FIXME: this method is deprecated and should be removed def create_sync_db_engine() -> None: """Create a synchronous database engine.""" - import atr.config as config global _global_sync_engine - conf = config.get() + conf = atr.config.get() sqlite_url = f"sqlite://{conf.SQLITE_DB_PATH}" _LOGGER.debug(f"Creating sync database engine in process {os.getpid()}") _global_sync_engine = sqlalchemy.create_engine(sqlite_url, echo=False) +# FIXME: this method is deprecated and should be removed def create_sync_db_session() -> sqlalchemy.orm.Session: """Create a new synchronous database session.""" global _global_sync_engine diff --git a/atr/server.py b/atr/server.py index 509c0cc..87cba7a 100644 --- a/atr/server.py +++ b/atr/server.py @@ -207,7 +207,7 @@ def create_app(app_config: type[config.AppConfig]) -> base.QuartApp: app = app_create_base(app_config) app_setup_api_docs(app) - db.create_database(app) + db.init_database(app) register_routes(app) blueprints.register(app) diff --git a/atr/worker.py b/atr/worker.py index 0214d68..48f2efd 100644 --- a/atr/worker.py +++ b/atr/worker.py @@ -69,7 +69,7 @@ def main() -> None: _setup_logging() _LOGGER.info(f"Starting worker process with pid {os.getpid()}") - # db.create_sync_db_engine() + db.init_database_for_worker() _worker_resources_limit_set() asyncio.run(_worker_loop_run()) @@ -80,7 +80,7 @@ def _setup_logging() -> None: log_format = "[%(asctime)s.%(msecs)03d] [%(process)d] [%(levelname)s] %(message)s" date_format = "%Y-%m-%d %H:%M:%S" - logging.basicConfig(filename="atr-worker.log", format=log_format, datefmt=date_format, level=logging.DEBUG) + logging.basicConfig(filename="atr-worker.log", format=log_format, datefmt=date_format, level=logging.INFO) # Task functions @@ -260,7 +260,7 @@ async def _worker_loop_run() -> None: break else: # No tasks available, wait 20ms before checking again - await asyncio.sleep(0.02) + await asyncio.sleep(0.1) except Exception: # TODO: Should probably be more robust about this # Extract the traceback and log it --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@tooling.apache.org For additional commands, e-mail: commits-h...@tooling.apache.org