it's not clear what would cause the time delay, seems like a timeout after a 
deadlock of some kind.  however first thing is that your async code is written 
incorrectly, as it is sharing the same AsyncSession among two concurrent async 
tasks, which will cause random errors at any number of levels (ORM Session, 
Core connection, asyncpg connection) and is likely to be related to your issue.

for your async.gather(), each task should receive (or create on its own) a 
separate AsyncSession that's independent of the other, and each AsyncSession 
should proceed on its own connection/transaction.  You can't do concurrent work 
within a single transaction even with an asyncpg connection.

On Wed, Dec 7, 2022, at 1:41 PM, Jaime Valdez wrote:
> Hi group,
> 
> I'm having some issues  trying to do some queries using the 
> create_async_engine, the following code is the fastest one.
> 
> import time
> import asyncio
> 
> from typing import Any, Dict, List
> 
> from sqlalchemy import create_engine
> from sqlalchemy.orm import sessionmaker
> from sqlalchemy import func, select
> 
> from asyncio import run as aiorun
> 
> import models
> 
> def get_session():
>     engine = 
> create_engine("postgresql+psycopg2://postgres:postgres@127.0.0.1:5431/db",
>         echo=True,
>         pool_size=20,
>         max_overflow=0,
>         pool_pre_ping=True,
>     )
>     return sessionmaker(bind=engine)
> 
> sess = get_session()
> session = sess()
> 
> _geom = "SRID=4269;POLYGON(( ... ))"
> 
> async def get_areas(geometry) -> List[Dict[str, Any]]:
>     _res = []
>    
>     sql = select(
>         models.GeoTable, func.ST_Intersection(models.GeoTable.geom, geometry)
>     ).where(func.ST_Intersects(models.GeoTable.geom, geometry))
> 
>     if rows := session.execute(sql).scalars():
>         _res.extend(
>             {
>                 "column": _f.column_value,
>             }
>             for _f in rows
>         )
>     return _res
> 
> loop = asyncio.get_event_loop()
> 
> async def _search():
>     tic = time.perf_counter()
>    
>     (
>         result1,
>         result2,
>     ) = loop.run_until_complete(asyncio.gather(
>         get_areas(_geom),
>         get_areas(_geom)
>     ))    
>     print(result1)
>     print(result2)
> 
>     toc = time.perf_counter()
>     print(f"{toc - tic:0.4f} seconds")
> 
> aiorun(_search())
> loop.close()
> 
> This code takes about 1.5 secs to complete. But using this other approach (by 
> using async engine) as the code below:
> 
> import time
> import asyncio
> 
> from typing import Any, Dict, List, AsyncGenerator
> 
> from sqlalchemy.ext.asyncio import create_async_engine
> from sqlalchemy.pool import NullPool
> from sqlalchemy.ext.asyncio import AsyncSession
> from sqlalchemy.orm import sessionmaker
> from sqlalchemy import func, select
> 
> from asyncio import run as aiorun
> 
> import models
> 
> async_engine = create_async_engine(
>     "postgresql+asyncpg://postgres:postgres@127.0.0.1:5431/db",
> )
> 
> async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
>     async_session = sessionmaker(bind=async_engine, class_=AsyncSession, 
> expire_on_commit=False)
>     async with async_session() as session:
>         yield session
> 
> _geom = "SRID=4269;POLYGON(( ... ))"
> 
> async def get_areas(session, geometry) -> List[Dict[str, Any]]:
>     sql = select(
>         models.GeoTable, func.ST_Intersection(models.GeoTable.geom, geometry)
>     ).where(func.ST_Intersects(models.GeoTable.geom, geometry))
> 
>     rows = await session.execute(sql)
>     return [
>         {
>             "column": _f.column_value,
>         }
>         for _r in rows.all()
>     ]
> 
> async def _search():
>     tic = time.perf_counter()
> 
>     async for session in get_async_session():    
>         (
>             result1,
>             result2,
>         ) = await asyncio.gather(
>             get_areas(session, _geom),
>             get_areas(session, _geom),
>         )
>         print(result1)
>         print(result2)
> 
>     toc = time.perf_counter()
>     print(f"{toc - tic:0.4f} seconds")
> 
> aiorun(_search())
> 
> This code takes about 354.1 seconds to complete, I wonder if I’m doing 
> something wrong?
> 
> Thanks!
> 
> 
> -- 
> SQLAlchemy - 
> The Python SQL Toolkit and Object Relational Mapper
>  
> http://www.sqlalchemy.org/
>  
> To post example code, please provide an MCVE: Minimal, Complete, and 
> Verifiable Example. See http://stackoverflow.com/help/mcve for a full 
> description.
> --- 
> You received this message because you are subscribed to the Google Groups 
> "sqlalchemy" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to sqlalchemy+unsubscr...@googlegroups.com.
> To view this discussion on the web visit 
> https://groups.google.com/d/msgid/sqlalchemy/21c67742-87a5-4ab4-b5e5-4b8ae7f50900n%40googlegroups.com
>  
> <https://groups.google.com/d/msgid/sqlalchemy/21c67742-87a5-4ab4-b5e5-4b8ae7f50900n%40googlegroups.com?utm_medium=email&utm_source=footer>.

-- 
SQLAlchemy - 
The Python SQL Toolkit and Object Relational Mapper

http://www.sqlalchemy.org/

To post example code, please provide an MCVE: Minimal, Complete, and Verifiable 
Example.  See  http://stackoverflow.com/help/mcve for a full description.
--- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/sqlalchemy/8bfe39c3-a638-41e3-a477-f70feeb2a57a%40app.fastmail.com.

Reply via email to