Looking into the internals of aiostream, it's meant to accept an async sequence of generators, see advanced.py flatmap. (perhaps some other function has to be used than merge().)
In which case, you could do something along the lines of: async def tasks(some_queue): yield go() yield go() yield go() while True: try: yield (await some_queue.get()) except Closed: return And then: q = ... async for v in flatmap(tasks(q)): if x: q.put(something_new()) The downside of this implementation is that deadlock is trivial. Then you'd probably try: try: yield (await some_queue.get()) except Empty: return Which would not block, but could conceivably drop last added task. Then more straightforward implementation would be as follows, it's easier to validate and verify: async def tasks(backlog=[]): while backlog: yield backlog.pop() usage: backlog = [go(), go(), go()] async for v in flatmap(tasks(backlog)): if x: backlog.append(go()) Which of course doesn't do what you want -- you want "all" tasks to start executing at the same time and then change what "all" means :) This implies that task generator needs to async-block, and then [magically] unblock when all tasks finish producing values (or are cancelled). I think aiostream has enough tools to put together what you want. If I were to hand-craft it, it would look something like this (pseudocode only): myflatten(source): generators = [] while generators and source-not-exhausted: item = await futures.wait(generators + [source], return_when=FIRST_COMPLETED)... if item is-a-generator: generators.append(item) else: yield item On 18 July 2018 at 21:01, James Stidard <jamesstid...@gmail.com> wrote: > Hi, > > I was wondering if anyone has had experience implementing a similar > pattern to this or has alternative suggestions of how to achieve it. > > I have a bunch of async generators which I’d like to be able to merge into > a single async generator I can iterate over. I found Vincent’s aiostream > library which gives me this without too much effort: > > from asyncio import sleep, run > from aiostream.stream import merge > > async def go(): > yield 0 > await sleep(1) > yield 50 > await sleep(1) > yield 100 > > async def main(): > tasks = merge(go(), go(), go()) > > async for v in tasks: > print(v) > > if __name__ == '__main__': > run(main()) > > However, I also would like to be able to add additional tasks into the > list once I’ve started iterating the list so something more akin to: > > from asyncio import sleep, run > from aiostream.stream import merge > > async def go(): > yield 0 > await sleep(1) > yield 50 > await sleep(1) > yield 100 > > async def main(): > tasks = merge(go(), go(), go()) > > async for v in tasks: > If v == 50: > tasks.merge(go()) > print(v) > > if __name__ == '__main__': > run(main()) > > Has anyone been able to achieve something like this? > > p.s. I know appending to a list you’re iterating is bad practice, I assume > the same would be true modifying this stream object, but think the example > illustrates what I’m trying to achieve. > > Thanks, > James > > > > _______________________________________________ > Async-sig mailing list > Async-sig@python.org > https://mail.python.org/mailman/listinfo/async-sig > Code of Conduct: https://www.python.org/psf/codeofconduct/ > >
_______________________________________________ Async-sig mailing list Async-sig@python.org https://mail.python.org/mailman/listinfo/async-sig Code of Conduct: https://www.python.org/psf/codeofconduct/