#36916: add support for streaming with TaskGroups
-------------------------------------+-------------------------------------
Reporter: Thomas Grainger | Owner: Amar
Type: Uncategorized | Status: assigned
Component: HTTP handling | Version: dev
Severity: Normal | Resolution:
Keywords: structured- | Triage Stage:
concurrency, taskgroups | Unreviewed
Has patch: 1 | Needs documentation: 0
Needs tests: 0 | Patch needs improvement: 0
Easy pickings: 1 | UI/UX: 0
-------------------------------------+-------------------------------------
Changes (by Amar):
* owner: (none) => Amar
* status: new => assigned
Old description:
> https://forum.djangoproject.com/t/streamingresponse-driven-
> by-a-taskgroup/40320/4
> https://github.com/django/new-features/issues/117
>
> ### Feature Description
>
> see https://forum.djangoproject.com/t/streamingresponse-driven-
> by-a-taskgroup/40320/4
>
> I'd like to be able to write code that combines multiple streams of data:
> ```py
> async def news_and_weather(request: HttpRequest) ->
> StreamingHttpResponse:
> async def gen() -> AsyncGenerator[bytes]:
> async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
> async with tx, connect_ws(ws_url) as conn:
> async for msg in conn:
> await tx.send(msg)
>
> async with anyio.create_task_group() as tg:
> tx, rx = anyio.create_memory_object_stream[bytes]()
> with tx, rx:
> tg.start_soon(push, "ws://example.com/news", tx.clone())
> tg.start_soon(push, "ws://example.com/weather",
> tx.clone())
> tx.close()
> async for msg in rx:
> yield msg # yield in async generator!! illegal
> inside TaskGroup!
> return StreamingHttpResponse(gen())
> ```
>
>
> ### Problem
>
> however this doesn’t work because I’m using a yield inside an async
> generator that’s not a context manager, and calling aclosing() on that
> async generator is not sufficient to allow a TaskGroup to cancel itself
> and catch the cancel error.
>
> ```py
>
> from useful_types import SupportsAnext
>
> class AsyncIteratorBytesResource(Protocol):
> """
> all the machinery needed to safely run an AsyncGenerator[Bytes]
>
> (for django-stubs) this allows AsyncGenerator[bytes] but is less
> strict
> so would also allow a anyio MemoryObjectRecieveStream[bytes]]
> """
>
> async def __aiter__(self) -> SupportsAnext[bytes]: ...
> async def aclose(self) -> object: ...
>
> async def news_and_weather(request: HttpRequest) ->
> StreamingAcmgrHttpResponse:
> @contextlib.asynccontextmanager
> async def acmgr_gen() -> AsyncGenerator[AsyncIteratorBytesResource]:
> async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
> async with tx, connect_ws(ws_url) as conn:
> async for msg in conn:
> await tx.send(msg)
>
> async with anyio.create_task_group() as tg:
> tx, rx = anyio.create_memory_object_stream[bytes]()
> with tx, rx:
> tg.start_soon(push, "ws://example.com/news", tx.clone())
> tg.start_soon(push, "ws://example.com/weather",
> tx.clone())
> tx.close()
> yield rx # yield inside asynccontextmanager, permitted
> inside TaskGroup
>
> return StreamingAcmgrHttpResponse(acmgr_gen())
> ```
>
> ### Implementation Suggestions
>
> https://github.com/django/django/pull/19364/changes
New description:
https://forum.djangoproject.com/t/streamingresponse-driven-
by-a-taskgroup/40320/4
https://github.com/django/new-features/issues/117
### Feature Description
see https://forum.djangoproject.com/t/streamingresponse-driven-
by-a-taskgroup/40320/4
I'd like to be able to write code that combines multiple streams of data:
```py
async def news_and_weather(request: HttpRequest) -> StreamingHttpResponse:
async def gen() -> AsyncGenerator[bytes]:
async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
async with tx, connect_ws(ws_url) as conn:
async for msg in conn:
await tx.send(msg)
async with anyio.create_task_group() as tg:
tx, rx = anyio.create_memory_object_stream[bytes]()
with tx, rx:
tg.start_soon(push, "ws://example.com/news", tx.clone())
tg.start_soon(push, "ws://example.com/weather",
tx.clone())
tx.close()
async for msg in rx:
yield msg # yield in async generator!! illegal inside
TaskGroup!
return StreamingHttpResponse(gen())
```
### Problem
however this doesn’t work because I’m using a yield inside an async
generator that’s not a context manager, and calling aclosing() on that
async generator is not sufficient to allow a TaskGroup to cancel itself
and catch the cancel error.
```py
from useful_types import SupportsAnext
class AsyncIteratorBytesResource(Protocol):
"""
all the machinery needed to safely run an AsyncGenerator[Bytes]
(for django-stubs) this allows AsyncGenerator[bytes] but is less
strict
so would also allow a anyio MemoryObjectRecieveStream[bytes]]
"""
async def __aiter__(self) -> SupportsAnext[bytes]: ...
async def aclose(self) -> object: ...
async def news_and_weather(request: HttpRequest) ->
StreamingAcmgrHttpResponse:
@contextlib.asynccontextmanager
async def acmgr_gen() -> AsyncGenerator[AsyncIteratorBytesResource]:
async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
async with tx, connect_ws(ws_url) as conn:
async for msg in conn:
await tx.send(msg)
async with anyio.create_task_group() as tg:
tx, rx = anyio.create_memory_object_stream[bytes]()
with tx, rx:
tg.start_soon(push, "ws://example.com/news", tx.clone())
tg.start_soon(push, "ws://example.com/weather",
tx.clone())
tx.close()
yield rx # yield inside asynccontextmanager, permitted
inside TaskGroup
return StreamingAcmgrHttpResponse(acmgr_gen())
```
### Implementation Suggestions
https://github.com/django/django/pull/19364/changes
--
--
Ticket URL: <https://code.djangoproject.com/ticket/36916#comment:1>
Django <https://code.djangoproject.com/>
The Web framework for perfectionists with deadlines.
--
You received this message because you are subscribed to the Google Groups
"Django updates" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion visit
https://groups.google.com/d/msgid/django-updates/0107019c47c740fd-a3b06744-fd2b-408b-9bf6-702f5b6f809a-000000%40eu-central-1.amazonses.com.