The task that led me down this path in the first place is performing a bunch of
slow queries to populate database tables. A few of the tables depend on others
and thus need to wait until their sources have been populated, while others are
free to be run fully parallelized.
Imagine this setup:
Tables A, B: no deps
Tables C, D: depend on A, B
Table E: no deps
Here's how I would like to write it:
`e_task = Task.async(fn -> populate("E") end)
[a, b] = Task.await_many([
Task.async(fn -> populate("A") end),
Task.async(fn -> populate("B") end)
], 10_000)
[c, d, e] = Task.await_many([
Task.async(fn -> populate("C", a, b) end),
Task.async(fn -> populate("D", a, b) end),
e_task
], 10_000)
do_stuff(a, b, c, d, e)
`
On Mon, Jan 13, 2020, at 1:13 PM, José Valim wrote:
> Hi Ian, thanks for the proposal.
>
> Quick question, what is the issue with the approach below:
> `prep = [
fn -> preheat_oven(350) end,
fn -> grease_pan(pan) end,
fn -> mix_batter() end,
]`
> `[{:ok, oven}, {:ok, pan}, {:ok, mix}] =
> prep |> Task.async_stream(& &1.()) |> Enum.to_list()`
> Also, can you provide a concrete example of where async_many is necessary? It
> will help guide the discussion.
>
>
> *José Valim*
> www.plataformatec.com.br
> Founder and Director of R&D
>
>
> On Mon, Jan 13, 2020 at 7:40 PM Ian Young <[email protected]> wrote:
>> __
>>
>> Background
>> The `Task` module currently contains three functions that synchronously
>> retrieve results from asynchronous tasks:
>> * `Task.await` <https://hexdocs.pm/elixir/Task.html#await/2>: Blocks
>> waiting for a reply from a single task. Accepts a timeout value. If the task
>> is successful, returns `reply`. If the task dies or the timeout is reached,
>> exits with the corresponding reason.
>> * `Task.yield` <https://hexdocs.pm/elixir/Task.html#yield/2>: Blocks
>> waiting for a reply from a single task. Accepts a timeout value. If the task
>> is successful, returns `{:ok, reply}`. If the task dies or the timeout is
>> reached, returns `{:exit, reason}` or `nil`.
>> * `Task.yield_many` <https://hexdocs.pm/elixir/Task.html#yield_many/2>:
>> Blocks waiting for replies from a list of tasks. Accepts a timeout value.
>> When all tasks are complete or the timeout is reached, returns a list of
>> result tuples: `{:ok, reply}` for successful tasks, `{:exit, reason}` for
>> dead tasks, and `nil` for timed-out.
>> Additionally, the `Task` module contains one function that handles both
>> creating asynchronous tasks and retrieving the results:
>> * `Task.async_stream` <https://hexdocs.pm/elixir/Task.html#async_stream/3>:
>> Asynchronously applies a given function to each element in a given
>> enumerable. Accepts a timeout value that is applied to each task separately.
>> Returns an enumerable that emits results, blocking as needed. If a task
>> dies, exits with the reason. When tasks complete, emits `{:ok, reply}`. When
>> tasks reach the timeout, either exits or emits `{:exit, :timeout}`,
>> depending on configuration options.
>> The discussion that eventually became `Task.async_stream`
>> <https://github.com/elixir-lang/elixir/issues/5033> included an alternative
>> suggestion of `Task.async_many` and `Task.await_many`. In the end
>> <https://github.com/elixir-lang/elixir/issues/5033#issuecomment-256893906>,
>> `async_stream` was chosen because it provides the ability to bound the
>> maximum concurrency and stream results, making it the most robust way to
>> handle intensive processing over an enumerable.
>> Proposal
>> I propose this addition to retrieve results from multiple asynchronous tasks
>> while adhering to `await` behavior:
>> `Task.await_many(tasks, timeout \\ 5000)`
>> Blocks waiting for replies from a list of tasks. If the tasks complete,
>> returns a list of replies. If the timeout is reached, exits with `:timeout`.
>> If a task dies, exits with the reason given by the task.
>> `Task.await` (together with `Task.async`) provides a simple solution that
>> can be used as a drop-in replacement for synchronous code. The addition of
>> `Task.await_many` will provide the building blocks for many common
>> asynchronous flows, with unsurprising default behavior. There are other ways
>> to accomplish the same thing, but Task.await_many provides a similar value
>> proposition to Task.await: it is simple to use, is the right amount of
>> tooling for simple use cases, and doesn't require extra code or reasoning
>> about complicated async workflow concerns.
>> It fits well with the existing feature set, since it essentially fills a gap
>> in the collection of related functions (yield_many is to yield as await_many
>> is to await). It should be very easy for people to use and understand,
>> provided they are familiar with the other `Task` functions.
>> As a toy example, consider baking a cake, a construction of heterogeneous
>> sometimes-parallelizable tasks:
>> `oven_prep = Task.async(fn -> preheat_oven(350) end),
{pan, bowl} = wash_dishes()
frosting_prep = Task.async(fn -> make_frosting(bowl, :pink) end)
[_, greased_pan, batter] = Task.await_many([
oven_prep,
Task.async(fn -> grease_pan(pan) end),
Task.async(fn -> mix_batter() end),
], 600_000)
baking = Task.async(fn ->
baked_cake = bake(batter, greased_pan, 30)
cool(baked_cake, 10)
>> end)
eat_dinner()
[cooled_cake, frosting] = Task.await_many([baking, frosting_prep])
cooled_cake
|> frost(frosting)
|> eat()
`
>> Alternatives
>> Why not `Task.await`?
>> A common pattern suggested online [1
>> <https://stackoverflow.com/a/42330810/58418>][2
>> <https://elixirforum.com/t/proper-use-of-task-async/13259/2>][3
>> <https://elixirforum.com/t/collecting-concurrent-tasks-results/3865>] is to
>> enumerate calls to `Task.await`:
>> `Enum.map(tasks, &Task.await(&1, timeout))`
>> Because the `await` calls happen sequentially, the timeout is reset for each
>> element of the list. This can lead to unexpected and likely unwanted
>> behavior in which this call may block much longer than the specified timeout.
>> Why not `Task.yield_many`?
>> `Task.yield_many` works fine for this situation, but it adheres to the
>> semantics of `Task.yield` rather than `Task.await`. It returns a tuple
>> instead of the bare reply, and on failure it does not exit or kill
>> still-running tasks. To achieve the behavior of `await`, you must write
>> something like this to handle the various possible results:
>> `Task.yield_many(tasks)
|> Enum.map(fn {task, result} ->
case result do
nil ->
# Maybe unnecessary since we are exiting?
Task.shutdown(task, :brutal_kill)
exit(:timeout)
{:exit, reason} ->
exit(reason)
{:ok, result} ->
result
end
>> end)`
>> Rather than expecting every developer to write this boilerplate (and not
>> make any mistakes in doing so), I think it would be better to provide a
>> construct in the standard library.
>> Why not `Task.async_stream`?
>> `Task.async_stream` is great for enumerating the same expensive operation
>> across a list of inputs, and it absolutely should be used for that. However,
>> it is not well-suited to situations where the collection of tasks is less
>> uniform. Consider the cake example:
>> `prep = [
Task.async(fn -> preheat_oven(350) end),
Task.async(fn -> grease_pan(pan) end),
Task.async(fn -> mix_batter() end),
]`
>> This would be a very awkward fit for `async_stream`. It is a specialized
>> tool that should not be applied in a generalized way. In addition,
>> `async_stream` has its own return values and exit behavior that does not
>> match that of `await`.
>> One potential harm of adding `Task.await_many` is that people might be
>> tempted to use it when they would be better off using `Task.async_stream`. I
>> believe this can be mitigated with proper documentation.
>> Why not `GenStage`?
>> `GenStage` <https://hexdocs.pm/gen_stage/GenStage.html> provides powerful
>> and flexible abstractions for handling asynchronous flows. For applications
>> that have complicated needs, this is a great tool. Often, though, we have
>> much simpler needs and applying `GenStage` to the problem would be massive
>> overkill.
>> `Task.await` is easy to use and easy to reason about. The goal of
>> `Task.await_many` is the same. It’s okay if it doesn’t cover every possible
>> use case, as long as it covers the ones we most commonly encounter in a way
>> that doesn’t encourage us to make mistakes.
>>
>>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "elixir-lang-core" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to [email protected].
>> To view this discussion on the web visit
>> https://groups.google.com/d/msgid/elixir-lang-core/16e0e909-6ce5-44e6-9a96-738dfc352015%40www.fastmail.com
>>
>> <https://groups.google.com/d/msgid/elixir-lang-core/16e0e909-6ce5-44e6-9a96-738dfc352015%40www.fastmail.com?utm_medium=email&utm_source=footer>.
>
> --
> You received this message because you are subscribed to the Google Groups
> "elixir-lang-core" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/elixir-lang-core/CAGnRm4KaMJuUgyzOFG2KcSgCnhPiS498yXHmdeKcbb0bAcp9pQ%40mail.gmail.com
>
> <https://groups.google.com/d/msgid/elixir-lang-core/CAGnRm4KaMJuUgyzOFG2KcSgCnhPiS498yXHmdeKcbb0bAcp9pQ%40mail.gmail.com?utm_medium=email&utm_source=footer>.
--
You received this message because you are subscribed to the Google Groups
"elixir-lang-core" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/elixir-lang-core/1c18e957-acbf-4ff0-8288-0d24d84750a9%40www.fastmail.com.