#36916: add support for streaming with TaskGroups
-------------------------------------+-------------------------------------
     Reporter:  Thomas Grainger      |                    Owner:  (none)
         Type:  Uncategorized        |                   Status:  new
    Component:  HTTP handling        |                  Version:  dev
     Severity:  Normal               |               Resolution:
     Keywords:  structured-          |             Triage Stage:
  concurrency, taskgroups            |  Unreviewed
    Has patch:  1                    |      Needs documentation:  0
  Needs tests:  0                    |  Patch needs improvement:  0
Easy pickings:  1                    |                    UI/UX:  0
-------------------------------------+-------------------------------------
Description changed by Thomas Grainger:

Old description:

> https://forum.djangoproject.com/t/streamingresponse-driven-
> by-a-taskgroup/40320/4
> https://github.com/django/new-features/issues/117
>
> ### Feature Description
>
> see  https://forum.djangoproject.com/t/streamingresponse-driven-
> by-a-taskgroup/40320/4
>
> I'd like to be able to write code that combines multiple streams of data:
> ```py
> async def news_and_weather(request: HttpRequest) ->
> StreamingHttpResponse:
>     async def gen() -> AsyncGenerator[bytes]:
>         async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
>             async with tx, connect_ws(ws_url) as conn:
>                 async for msg in conn:
>                     await tx.send(msg)
>
>         async with anyio.create_task_group() as tg:
>             tx, rx =  anyio.create_memory_object_stream[bytes]()
>             with tx, rx:
>                 tg.start_soon(push, "ws://example.com/news", tx.clone())
>                 tg.start_soon(push, "ws://example.com/weather",
> tx.clone())
>                 tx.close()
>                 async for msg in rx:
>                     yield msg  # yield in async generator!! illegal
> inside TaskGroup!
>     return StreamingHttpResponse(gen())
> ```
>

>
> ### Problem
>
> however this doesn’t work because I’m using a yield inside an async
> generator that’s not a context manager, and calling aclosing() on that
> async generator is not sufficient to allow a TaskGroup to cancel itself
> and catch the cancel error.
>
> ```py
>
> from useful_types import SupportsAnext
>
> class AsyncIteratorBytesResource(Protocol):
>     """
>     all the machinery needed to safely run an AsyncGenerator[Bytes]
>
>     (for django-stubs) this allows AsyncGenerator[bytes] but is less
> strict
>     so would also allow a anyio MemoryObjectRecieveStream[bytes]]
>     """
>
>     async def __aiter__(self) -> SupportsAnext[bytes]: ...
>     async def aclose(self) -> object: ...
>

> async def news_and_weather(request: HttpRequest) ->
> StreamingAcmgrHttpResponse:
>     @contextlib.asynccontextmanager
>     async def acmgr_gen() -> AsyncGenerator[AsyncIteratorBytesResource]:
>         async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
>             async with tx, connect_ws(ws_url) as conn:
>                 async for msg in conn:
>                     await tx.send(msg)
>
>         async with anyio.create_task_group() as tg:
>             tx, rx =  anyio.create_memory_object_stream[bytes]()
>             with tx, rx:
>                 tg.start_soon(push, "ws://example.com/news", tx.clone())
>                 tg.start_soon(push, "ws://example.com/weather",
> tx.clone())
>                 tx.close()
>                 yield rx  # yield inside asynccontextmanager, permitted
> inside TaskGroup
>
>     return StreamingAcmgrHttpResponse(acmgr_gen())
> ```
>
> ### Implementation Suggestions
>
> https://github.com/django/django/pull/19364/changes

New description:

 https://forum.djangoproject.com/t/streamingresponse-driven-
 by-a-taskgroup/40320/4
 https://github.com/django/new-features/issues/117


 === Feature Description ===

 see https://forum.djangoproject.com/t/streamingresponse-driven-
 by-a-taskgroup/40320/4

 I’d like to be able to write code that combines multiple streams of data:


 {{{
 async def news_and_weather(request: HttpRequest) -> StreamingHttpResponse:
     async def gen() -> AsyncGenerator[bytes]:
         async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
             async with tx, connect_ws(ws_url) as conn:
                 async for msg in conn:
                     await tx.send(msg)

         async with anyio.create_task_group() as tg:
             tx, rx =  anyio.create_memory_object_stream[bytes]()
             with tx, rx:
                 tg.start_soon(push, "ws://example.com/news", tx.clone())
                 tg.start_soon(push, "ws://example.com/weather",
 tx.clone())
                 tx.close()
                 async for msg in rx:
                     yield msg  # yield in async generator!! illegal inside
 TaskGroup!
     return StreamingHttpResponse(gen())
 }}}



 === Problem ===

 however this doesn’t work because I’m using a yield inside an async
 generator that’s not a context manager, and calling aclosing() on that
 async generator is not sufficient to allow a TaskGroup to cancel itself
 and catch the cancel error.



 {{{
 from useful_types import SupportsAnext

 class AsyncIteratorBytesResource(Protocol):
     """
     all the machinery needed to safely run an AsyncGenerator[Bytes]

     (for django-stubs) this allows AsyncGenerator[bytes] but is less
 strict
     so would also allow a anyio MemoryObjectRecieveStream[bytes]]
     """

     async def __aiter__(self) -> SupportsAnext[bytes]: ...
     async def aclose(self) -> object: ...


 async def news_and_weather(request: HttpRequest) ->
 StreamingAcmgrHttpResponse:
     @contextlib.asynccontextmanager
     async def acmgr_gen() -> AsyncGenerator[AsyncIteratorBytesResource]:
         async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
             async with tx, connect_ws(ws_url) as conn:
                 async for msg in conn:
                     await tx.send(msg)

         async with anyio.create_task_group() as tg:
             tx, rx =  anyio.create_memory_object_stream[bytes]()
             with tx, rx:
                 tg.start_soon(push, "ws://example.com/news", tx.clone())
                 tg.start_soon(push, "ws://example.com/weather",
 tx.clone())
                 tx.close()
                 yield rx  # yield inside asynccontextmanager, permitted
 inside TaskGroup

     return StreamingAcmgrHttpResponse(acmgr_gen())
 }}}


 === Implementation Suggestions ===

 https://github.com/django/django/pull/19364/changes

--
-- 
Ticket URL: <https://code.djangoproject.com/ticket/36916#comment:3>
Django <https://code.djangoproject.com/>
The Web framework for perfectionists with deadlines.

-- 
You received this message because you are subscribed to the Google Groups 
"Django updates" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion visit 
https://groups.google.com/d/msgid/django-updates/0107019c486a3f7a-ca0ddc9a-4142-40a2-8954-18eb0df2c4e6-000000%40eu-central-1.amazonses.com.

Reply via email to