This is an automated email from the ASF dual-hosted git repository.

tn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-release.git


The following commit(s) were added to refs/heads/main by this push:
     new f635451  simplify db initalization for app / worker, reduce amount of 
logging in worker, increate sleep in worker loop to reduce cpu usage
f635451 is described below

commit f6354518c75e33d2623bebd1e4df64d7bc907778
Author: Thomas Neidhart <t...@apache.org>
AuthorDate: Wed Mar 12 22:02:53 2025 +0100

    simplify db initalization for app / worker, reduce amount of logging in 
worker, increate sleep in worker loop to reduce cpu usage
---
 atr/db/__init__.py | 69 +++++++++++++++++++++++++++++++++---------------------
 atr/server.py      |  2 +-
 atr/worker.py      |  6 ++---
 3 files changed, 46 insertions(+), 31 deletions(-)

diff --git a/atr/db/__init__.py b/atr/db/__init__.py
index 8209976..b9f8b9d 100644
--- a/atr/db/__init__.py
+++ b/atr/db/__init__.py
@@ -27,30 +27,30 @@ import sqlalchemy.orm as orm
 import sqlalchemy.sql as sql
 import sqlmodel
 
+import atr.config
 import atr.util as util
 from asfquart.base import QuartApp
+from atr.config import AppConfig
 
 _LOGGER: Final = logging.getLogger(__name__)
 
+_global_async_sessionmaker: sqlalchemy.ext.asyncio.async_sessionmaker | None = 
None
 _global_sync_engine: sqlalchemy.Engine | None = None
 
 
-def create_database(app: QuartApp) -> None:
+def init_database(app: QuartApp) -> None:
+    """
+    Creates and initializes the database for a QuartApp.
+
+    The database is created and an AsyncSession is registered as extension for 
the app.
+    Any pending migrations are executed.
+    """
+
     @app.before_serving
     async def create() -> None:
-        project_root = app.config["PROJECT_ROOT"]
-        sqlite_db_path = app.config["SQLITE_DB_PATH"]
-        sqlite_url = f"sqlite+aiosqlite://{sqlite_db_path}"
-        # Use aiosqlite for async SQLite access
-        engine = sqlalchemy.ext.asyncio.create_async_engine(
-            sqlite_url,
-            connect_args={
-                "check_same_thread": False,
-                "timeout": 30,
-            },
-        )
+        app_config = atr.config.get()
+        engine = create_async_engine(app_config)
 
-        # Create async session factory
         app.extensions["async_session"] = 
sqlalchemy.ext.asyncio.async_sessionmaker(
             bind=engine, class_=sqlalchemy.ext.asyncio.AsyncSession, 
expire_on_commit=False
         )
@@ -68,13 +68,14 @@ def create_database(app: QuartApp) -> None:
         # In dev we'd do this first:
         # poetry run alembic revision --autogenerate -m "description"
         # Then review the generated migration in migrations/versions/ and 
commit it
+        project_root = app_config.PROJECT_ROOT
         alembic_ini_path = os.path.join(project_root, "alembic.ini")
         alembic_cfg = config.Config(alembic_ini_path)
         # Override the migrations directory location to use project root
         # TODO: Is it possible to set this in alembic.ini?
         alembic_cfg.set_main_option("script_location", 
os.path.join(project_root, "migrations"))
         # Set the database URL in the config
-        alembic_cfg.set_main_option("sqlalchemy.url", sqlite_url)
+        alembic_cfg.set_main_option("sqlalchemy.url", str(engine.url))
         # command.upgrade(alembic_cfg, "head")
 
         # Create any tables that might be missing
@@ -82,16 +83,19 @@ def create_database(app: QuartApp) -> None:
             await conn.run_sync(sqlmodel.SQLModel.metadata.create_all)
 
 
-def create_async_db_session() -> sqlalchemy.ext.asyncio.AsyncSession:
-    """Create a new asynchronous database session."""
-    if quart.has_app_context():
-        extensions = quart.current_app.extensions
-        return util.validate_as_type(extensions["async_session"](), 
sqlalchemy.ext.asyncio.AsyncSession)
+def init_database_for_worker() -> None:
+    global _global_async_sessionmaker
+
+    _LOGGER.info(f"Creating database for worker {os.getpid()}")
+    engine = create_async_engine(atr.config.get())
+    _global_async_sessionmaker = sqlalchemy.ext.asyncio.async_sessionmaker(
+        bind=engine, class_=sqlalchemy.ext.asyncio.AsyncSession, 
expire_on_commit=False
+    )
 
-    import atr.config as config
 
-    conf = config.get()
-    sqlite_url = f"sqlite+aiosqlite://{conf.SQLITE_DB_PATH}"
+def create_async_engine(app_config: type[AppConfig]) -> 
sqlalchemy.ext.asyncio.AsyncEngine:
+    sqlite_url = f"sqlite+aiosqlite://{app_config.SQLITE_DB_PATH}"
+    # Use aiosqlite for async SQLite access
     engine = sqlalchemy.ext.asyncio.create_async_engine(
         sqlite_url,
         connect_args={
@@ -99,23 +103,34 @@ def create_async_db_session() -> 
sqlalchemy.ext.asyncio.AsyncSession:
             "timeout": 30,
         },
     )
-    return sqlalchemy.ext.asyncio.async_sessionmaker(
-        bind=engine, class_=sqlalchemy.ext.asyncio.AsyncSession, 
expire_on_commit=False
-    )()
+
+    return engine
+
+
+def create_async_db_session() -> sqlalchemy.ext.asyncio.AsyncSession:
+    """Create a new asynchronous database session."""
+    if quart.has_app_context():
+        extensions = quart.current_app.extensions
+        return util.validate_as_type(extensions["async_session"](), 
sqlalchemy.ext.asyncio.AsyncSession)
+    else:
+        if _global_async_sessionmaker is None:
+            raise RuntimeError("Global async_sessionmaker not initialized, run 
init_database() first.")
+        return util.validate_as_type(_global_async_sessionmaker(), 
sqlalchemy.ext.asyncio.AsyncSession)
 
 
+# FIXME: this method is deprecated and should be removed
 def create_sync_db_engine() -> None:
     """Create a synchronous database engine."""
-    import atr.config as config
 
     global _global_sync_engine
 
-    conf = config.get()
+    conf = atr.config.get()
     sqlite_url = f"sqlite://{conf.SQLITE_DB_PATH}"
     _LOGGER.debug(f"Creating sync database engine in process {os.getpid()}")
     _global_sync_engine = sqlalchemy.create_engine(sqlite_url, echo=False)
 
 
+# FIXME: this method is deprecated and should be removed
 def create_sync_db_session() -> sqlalchemy.orm.Session:
     """Create a new synchronous database session."""
     global _global_sync_engine
diff --git a/atr/server.py b/atr/server.py
index 509c0cc..87cba7a 100644
--- a/atr/server.py
+++ b/atr/server.py
@@ -207,7 +207,7 @@ def create_app(app_config: type[config.AppConfig]) -> 
base.QuartApp:
     app = app_create_base(app_config)
     app_setup_api_docs(app)
 
-    db.create_database(app)
+    db.init_database(app)
     register_routes(app)
     blueprints.register(app)
 
diff --git a/atr/worker.py b/atr/worker.py
index 0214d68..48f2efd 100644
--- a/atr/worker.py
+++ b/atr/worker.py
@@ -69,7 +69,7 @@ def main() -> None:
     _setup_logging()
 
     _LOGGER.info(f"Starting worker process with pid {os.getpid()}")
-    # db.create_sync_db_engine()
+    db.init_database_for_worker()
 
     _worker_resources_limit_set()
     asyncio.run(_worker_loop_run())
@@ -80,7 +80,7 @@ def _setup_logging() -> None:
     log_format = "[%(asctime)s.%(msecs)03d] [%(process)d] [%(levelname)s] 
%(message)s"
     date_format = "%Y-%m-%d %H:%M:%S"
 
-    logging.basicConfig(filename="atr-worker.log", format=log_format, 
datefmt=date_format, level=logging.DEBUG)
+    logging.basicConfig(filename="atr-worker.log", format=log_format, 
datefmt=date_format, level=logging.INFO)
 
 
 # Task functions
@@ -260,7 +260,7 @@ async def _worker_loop_run() -> None:
                 break
             else:
                 # No tasks available, wait 20ms before checking again
-                await asyncio.sleep(0.02)
+                await asyncio.sleep(0.1)
         except Exception:
             # TODO: Should probably be more robust about this
             # Extract the traceback and log it


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@tooling.apache.org
For additional commands, e-mail: commits-h...@tooling.apache.org

Reply via email to