The task that led me down this path in the first place is performing a bunch of 
slow queries to populate database tables. A few of the tables depend on others 
and thus need to wait until their sources have been populated, while others are 
free to be run fully parallelized.

Imagine this setup:

Tables A, B: no deps
Tables C, D: depend on A, B
Table E: no deps

Here's how I would like to write it:

`e_task = Task.async(fn -> populate("E") end)

[a, b] = Task.await_many([
  Task.async(fn -> populate("A") end),
  Task.async(fn -> populate("B") end)
], 10_000)

[c, d, e] = Task.await_many([
  Task.async(fn -> populate("C", a, b) end),
  Task.async(fn -> populate("D", a, b) end),
  e_task
], 10_000)

do_stuff(a, b, c, d, e)
  `

On Mon, Jan 13, 2020, at 1:13 PM, José Valim wrote:
> Hi Ian, thanks for the proposal.
> 
> Quick question, what is the issue with the approach below:
> `prep = [
  fn -> preheat_oven(350) end,
  fn -> grease_pan(pan) end,
  fn -> mix_batter() end,
]`
> `[{:ok, oven}, {:ok, pan}, {:ok, mix}] =
>   prep |> Task.async_stream(& &1.()) |> Enum.to_list()`
> Also, can you provide a concrete example of where async_many is necessary? It 
> will help guide the discussion.
> 
> 
> *José Valim*
> www.plataformatec.com.br
> Founder and Director of R&D
> 
> 
> On Mon, Jan 13, 2020 at 7:40 PM Ian Young <[email protected]> wrote:
>> __
>> 
>> Background

>> The `Task` module currently contains three functions that synchronously 
>> retrieve results from asynchronous tasks:

>>  * `Task.await` <https://hexdocs.pm/elixir/Task.html#await/2>: Blocks 
>> waiting for a reply from a single task. Accepts a timeout value. If the task 
>> is successful, returns `reply`. If the task dies or the timeout is reached, 
>> exits with the corresponding reason.
>>  * `Task.yield` <https://hexdocs.pm/elixir/Task.html#yield/2>: Blocks 
>> waiting for a reply from a single task. Accepts a timeout value. If the task 
>> is successful, returns `{:ok, reply}`. If the task dies or the timeout is 
>> reached, returns `{:exit, reason}` or `nil`.
>>  * `Task.yield_many` <https://hexdocs.pm/elixir/Task.html#yield_many/2>: 
>> Blocks waiting for replies from a list of tasks. Accepts a timeout value. 
>> When all tasks are complete or the timeout is reached, returns a list of 
>> result tuples: `{:ok, reply}` for successful tasks, `{:exit, reason}` for 
>> dead tasks, and `nil` for timed-out.
>> Additionally, the `Task` module contains one function that handles both 
>> creating asynchronous tasks and retrieving the results:

>>  * `Task.async_stream` <https://hexdocs.pm/elixir/Task.html#async_stream/3>: 
>> Asynchronously applies a given function to each element in a given 
>> enumerable. Accepts a timeout value that is applied to each task separately. 
>> Returns an enumerable that emits results, blocking as needed. If a task 
>> dies, exits with the reason. When tasks complete, emits `{:ok, reply}`. When 
>> tasks reach the timeout, either exits or emits `{:exit, :timeout}`, 
>> depending on configuration options.
>> The discussion that eventually became `Task.async_stream` 
>> <https://github.com/elixir-lang/elixir/issues/5033> included an alternative 
>> suggestion of `Task.async_many` and `Task.await_many`. In the end 
>> <https://github.com/elixir-lang/elixir/issues/5033#issuecomment-256893906>, 
>> `async_stream` was chosen because it provides the ability to bound the 
>> maximum concurrency and stream results, making it the most robust way to 
>> handle intensive processing over an enumerable.

>> Proposal

>> I propose this addition to retrieve results from multiple asynchronous tasks 
>> while adhering to `await` behavior:

>> `Task.await_many(tasks, timeout \\ 5000)`
>> Blocks waiting for replies from a list of tasks. If the tasks complete, 
>> returns a list of replies. If the timeout is reached, exits with `:timeout`. 
>> If a task dies, exits with the reason given by the task.

>> `Task.await` (together with `Task.async`) provides a simple solution that 
>> can be used as a drop-in replacement for synchronous code. The addition of 
>> `Task.await_many` will provide the building blocks for many common 
>> asynchronous flows, with unsurprising default behavior. There are other ways 
>> to accomplish the same thing, but Task.await_many provides a similar value 
>> proposition to Task.await: it is simple to use, is the right amount of 
>> tooling for simple use cases, and doesn't require extra code or reasoning 
>> about complicated async workflow concerns.

>> It fits well with the existing feature set, since it essentially fills a gap 
>> in the collection of related functions (yield_many is to yield as await_many 
>> is to await). It should be very easy for people to use and understand, 
>> provided they are familiar with the other `Task` functions.

>> As a toy example, consider baking a cake, a construction of heterogeneous 
>> sometimes-parallelizable tasks:

>> `oven_prep = Task.async(fn -> preheat_oven(350) end),

{pan, bowl} = wash_dishes()

frosting_prep = Task.async(fn -> make_frosting(bowl, :pink) end)

[_, greased_pan, batter] = Task.await_many([
  oven_prep,
  Task.async(fn -> grease_pan(pan) end),
  Task.async(fn -> mix_batter() end),
], 600_000)

baking = Task.async(fn ->
  baked_cake = bake(batter, greased_pan, 30)
  cool(baked_cake, 10)
>> end)

eat_dinner()

[cooled_cake, frosting] = Task.await_many([baking, frosting_prep])

cooled_cake
|> frost(frosting)
|> eat()
`
>> Alternatives

>> Why not `Task.await`?

>> A common pattern suggested online [1 
>> <https://stackoverflow.com/a/42330810/58418>][2 
>> <https://elixirforum.com/t/proper-use-of-task-async/13259/2>][3 
>> <https://elixirforum.com/t/collecting-concurrent-tasks-results/3865>] is to 
>> enumerate calls to `Task.await`:

>> `Enum.map(tasks, &Task.await(&1, timeout))`
>> Because the `await` calls happen sequentially, the timeout is reset for each 
>> element of the list. This can lead to unexpected and likely unwanted 
>> behavior in which this call may block much longer than the specified timeout.

>> Why not `Task.yield_many`?

>> `Task.yield_many` works fine for this situation, but it adheres to the 
>> semantics of `Task.yield` rather than `Task.await`. It returns a tuple 
>> instead of the bare reply, and on failure it does not exit or kill 
>> still-running tasks. To achieve the behavior of `await`, you must write 
>> something like this to handle the various possible results:

>> `Task.yield_many(tasks)
|> Enum.map(fn {task, result} ->
  case result do
    nil ->
      # Maybe unnecessary since we are exiting?
      Task.shutdown(task, :brutal_kill)
      exit(:timeout)
    {:exit, reason} ->
      exit(reason)
    {:ok, result} ->
      result
  end
>> end)`
>> Rather than expecting every developer to write this boilerplate (and not 
>> make any mistakes in doing so), I think it would be better to provide a 
>> construct in the standard library.

>> Why not `Task.async_stream`?

>> `Task.async_stream` is great for enumerating the same expensive operation 
>> across a list of inputs, and it absolutely should be used for that. However, 
>> it is not well-suited to situations where the collection of tasks is less 
>> uniform. Consider the cake example:

>> `prep = [
  Task.async(fn -> preheat_oven(350) end),
  Task.async(fn -> grease_pan(pan) end),
  Task.async(fn -> mix_batter() end),
]`
>> This would be a very awkward fit for `async_stream`. It is a specialized 
>> tool that should not be applied in a generalized way. In addition, 
>> `async_stream` has its own return values and exit behavior that does not 
>> match that of `await`.

>> One potential harm of adding `Task.await_many` is that people might be 
>> tempted to use it when they would be better off using `Task.async_stream`. I 
>> believe this can be mitigated with proper documentation.

>> Why not `GenStage`?

>> `GenStage`  <https://hexdocs.pm/gen_stage/GenStage.html> provides powerful 
>> and flexible abstractions for handling asynchronous flows. For applications 
>> that have complicated needs, this is a great tool. Often, though, we have 
>> much simpler needs and applying `GenStage` to the problem would be massive 
>> overkill.

>> `Task.await` is easy to use and easy to reason about. The goal of 
>> `Task.await_many` is the same. It’s okay if it doesn’t cover every possible 
>> use case, as long as it covers the ones we most commonly encounter in a way 
>> that doesn’t encourage us to make mistakes.

>> 
>> 

>> --
>>  You received this message because you are subscribed to the Google Groups 
>> "elixir-lang-core" group.
>>  To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected].
>>  To view this discussion on the web visit 
>> https://groups.google.com/d/msgid/elixir-lang-core/16e0e909-6ce5-44e6-9a96-738dfc352015%40www.fastmail.com
>>  
>> <https://groups.google.com/d/msgid/elixir-lang-core/16e0e909-6ce5-44e6-9a96-738dfc352015%40www.fastmail.com?utm_medium=email&utm_source=footer>.
> 

> --
>  You received this message because you are subscribed to the Google Groups 
> "elixir-lang-core" group.
>  To unsubscribe from this group and stop receiving emails from it, send an 
> email to [email protected].
>  To view this discussion on the web visit 
> https://groups.google.com/d/msgid/elixir-lang-core/CAGnRm4KaMJuUgyzOFG2KcSgCnhPiS498yXHmdeKcbb0bAcp9pQ%40mail.gmail.com
>  
> <https://groups.google.com/d/msgid/elixir-lang-core/CAGnRm4KaMJuUgyzOFG2KcSgCnhPiS498yXHmdeKcbb0bAcp9pQ%40mail.gmail.com?utm_medium=email&utm_source=footer>.

-- 
You received this message because you are subscribed to the Google Groups 
"elixir-lang-core" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/elixir-lang-core/1c18e957-acbf-4ff0-8288-0d24d84750a9%40www.fastmail.com.

Reply via email to