Looking into the internals of aiostream, it's meant to accept an async
sequence of generators, see advanced.py flatmap.
(perhaps some other function has to be used than merge().)

In which case, you could do something along the lines of:

async def tasks(some_queue):
    yield go()
    yield go()
    yield go()
    while True:
        try:
            yield (await some_queue.get())
        except Closed:
            return

And then:

    q = ...
    async for v in flatmap(tasks(q)):
        if x: q.put(something_new())

The downside of this implementation is that deadlock is trivial.
Then you'd probably try:

        try:
            yield (await some_queue.get())
        except Empty:
            return

Which would not block, but could conceivably drop last added task.



Then more straightforward implementation would be as follows, it's easier
to validate and verify:

async def tasks(backlog=[]):
    while backlog:
        yield backlog.pop()

usage:

    backlog = [go(), go(), go()]
    async for v in flatmap(tasks(backlog)):
        if x: backlog.append(go())

Which of course doesn't do what you want -- you want "all" tasks to start
executing at the same time and then change what "all" means :)
This implies that task generator needs to async-block, and then [magically]
unblock when all tasks finish producing values (or are cancelled).

I think aiostream has enough tools to put together what you want.

If I were to hand-craft it, it would look something like this (pseudocode
only):

myflatten(source):
    generators = []
    while generators and source-not-exhausted:
        item = await futures.wait(generators + [source],
return_when=FIRST_COMPLETED)...
        if item is-a-generator:
            generators.append(item)
        else:
            yield item

On 18 July 2018 at 21:01, James Stidard <jamesstid...@gmail.com> wrote:

> Hi,
>
> I was wondering if anyone has had experience implementing a similar
> pattern to this or has alternative suggestions of how to achieve it.
>
> I have a bunch of async generators which I’d like to be able to merge into
> a single async generator I can iterate over. I found Vincent’s aiostream
> library which gives me this without too much effort:
>
> from asyncio import sleep, run
> from aiostream.stream import merge
>
> async def go():
>     yield 0
>     await sleep(1)
>     yield 50
>     await sleep(1)
>     yield 100
>
> async def main():
>     tasks = merge(go(), go(), go())
>
>     async for v in tasks:
>         print(v)
>
> if __name__ == '__main__':
>     run(main())
>
> However, I also would like to be able to add additional tasks into the
> list once I’ve started iterating the list so something more akin to:
>
> from asyncio import sleep, run
> from aiostream.stream import merge
>
> async def go():
>     yield 0
>     await sleep(1)
>     yield 50
>     await sleep(1)
>     yield 100
>
> async def main():
>     tasks = merge(go(), go(), go())
>
>     async for v in tasks:
>         If v == 50:
>             tasks.merge(go())
>         print(v)
>
> if __name__ == '__main__':
>     run(main())
>
> Has anyone been able to achieve something like this?
>
> p.s. I know appending to a list you’re iterating is bad practice, I assume
> the same would be true modifying this stream object, but think the example
> illustrates what I’m trying to achieve.
>
> Thanks,
> James
>
>
>
> _______________________________________________
> Async-sig mailing list
> Async-sig@python.org
> https://mail.python.org/mailman/listinfo/async-sig
> Code of Conduct: https://www.python.org/psf/codeofconduct/
>
>
_______________________________________________
Async-sig mailing list
Async-sig@python.org
https://mail.python.org/mailman/listinfo/async-sig
Code of Conduct: https://www.python.org/psf/codeofconduct/

Reply via email to