something like `ip_inputs:true` might work. I usually update a container in the task, and return it, so it contains the input + the output. But if we had something like zip_inputs, then the task itself could just return the calculated value, and it's up to the pipeline what to do with them. I actually like it. What would the output look like? [{:ok, _input_, output}] [{:error, _input_, :timeout}] or something along the lines of [{_input_, {:ok, output}] [{_input_, {:error, :timeout}] On Friday, 18 February 2022 at 11:08:25 UTC+1 José Valim wrote:
> Maybe we should have a zip_inputs: true or similar flag and we attach the > input to all "ok" and "exit" tuples. > > On Fri, Feb 18, 2022 at 11:03 AM vtm <vtmil...@gmail.com> wrote: > >> Hi Jose and Juan. >> I think there is a problem that you should have `order: true` to have a >> correct result in Enum.zip >> Also you might wanna have a big list and just use `big_list |> >> Task.async_stream(params) |> Stream.filter(stream, &is_error/1) |> >> Stream.map(&handle_error/1) |> Stream.run` >> just to capture error with values and do some work >> >> >> пт, 18 февр. 2022 г. в 12:48, José Valim <jose....@dashbit.co>: >> >>> Hi Juan, >>> >>> In your example a simple Enum.zip would suffice: >>> >>> list = [1, 2, 3, 4] >>> >>> list >>> |> Task.async_stream( >>> fn entry -> >>> if entry == 3, do: :timer.sleep(2000) >>> entry * entry >>> end, >>> timeout: 1000, >>> on_timeout: :exit_task >>> ) >>> |> Enum.zip(list) >>> >>> Can you provide an example where zipping would not be possible or too >>> cumbersome? >>> >>> On Fri, Feb 18, 2022 at 10:38 AM Juan Peri <eternop...@gmail.com> wrote: >>> >>>> In these past few year I've found myself needing to execute several >>>> async tasks (mostly around getting remote resources) in scripts, and >>>> wanting to do something in the ones that fail >>>> With the current implementation of async_stream, there is no way to >>>> know which ones failed, as the output would look like the following: >>>> ``` >>>> [1, 2, 3, 4] >>>> |> Task.async_stream( >>>> fn entry -> >>>> if entry == 3, do: :timer.sleep(2000) >>>> entry * entry >>>> end, >>>> timeout: 1000, >>>> on_timeout: :exit_task >>>> ) >>>> |> Enum.to_list() >>>> [ok: 1, ok: 4, exit: :timeout, ok: 16] >>>> ``` >>>> That would force me to get creative with stream Zip, or hand roll my >>>> own solution. >>>> >>>> @vtm in the elixir slack pointed me to the commit >>>> https://github.com/elixir-lang/elixir/commit/c94327cc4 in wich such >>>> functionality was removed because of backwards compatibility. >>>> And the commit that introduced it was >>>> https://github.com/elixir-lang/elixir/commit/6c1fe08a676f1cacb7ee66dd56251b8a48a02d63#diff-0e81c1197153b352765d1d43ca57817901d19813a220e95a2cd2e70f3cfaa279R376 >>>> >>>> when adding the :ordered parameter to Task.async_stream >>>> >>>> Would you accept a PR about exposing this functionality again, but >>>> behind a opt-in new parameter that is false by default in order to keep >>>> backwards compatibility. >>>> >>>> Using the previous example, it would look something like: >>>> 1, 2, 3, 4] >>>> |> Task.async_stream( >>>> ..., >>>> timeout: 1000, >>>> on_timeout: :exit_task_with_value, >>>> ) >>>> |> Enum.to_list() >>>> [ok: 1, ok: 4, {:exit :timeout, 3}, ok: 16] >>>> ``` >>>> I'm not sold in the flag :exit_task_with_value, it could be a separate >>>> parameter as well, as long as it's opt_in >>>> >>>> Thanks >>>> *_________________________________________________________**_**____* >>>> *Juan* - >>>> *No todo el oro reluce.....Ni todo errante anda perdido* >>>> *¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯**¯**¯**¯¯¯¯* >>>> >>>> -- >>>> You received this message because you are subscribed to the Google >>>> Groups "elixir-lang-core" group. >>>> To unsubscribe from this group and stop receiving emails from it, send >>>> an email to elixir-lang-co...@googlegroups.com. >>>> To view this discussion on the web visit >>>> https://groups.google.com/d/msgid/elixir-lang-core/CAPx9ufN-kdNYuAJri_z9pxQ-0yopvSdNUVBLpZ1G6fTHPk1BDw%40mail.gmail.com >>>> >>>> <https://groups.google.com/d/msgid/elixir-lang-core/CAPx9ufN-kdNYuAJri_z9pxQ-0yopvSdNUVBLpZ1G6fTHPk1BDw%40mail.gmail.com?utm_medium=email&utm_source=footer> >>>> . >>>> >>> -- >>> You received this message because you are subscribed to a topic in the >>> Google Groups "elixir-lang-core" group. >>> To unsubscribe from this topic, visit >>> https://groups.google.com/d/topic/elixir-lang-core/XCQIKQc3KxM/unsubscribe >>> . >>> To unsubscribe from this group and all its topics, send an email to >>> elixir-lang-co...@googlegroups.com. >>> To view this discussion on the web visit >>> https://groups.google.com/d/msgid/elixir-lang-core/CAGnRm4Li9mcnQ2mqN4xKb0qW9AX5uFi%2BXD77BDrqR-%3D0gW_8iA%40mail.gmail.com >>> >>> <https://groups.google.com/d/msgid/elixir-lang-core/CAGnRm4Li9mcnQ2mqN4xKb0qW9AX5uFi%2BXD77BDrqR-%3D0gW_8iA%40mail.gmail.com?utm_medium=email&utm_source=footer> >>> . >>> >> -- >> You received this message because you are subscribed to the Google Groups >> "elixir-lang-core" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to elixir-lang-co...@googlegroups.com. >> > To view this discussion on the web visit >> https://groups.google.com/d/msgid/elixir-lang-core/CAKrSanzzRBYyFCM6T6dFRWoXVPBj-k2jbci2wfEsJa4bnDvL7g%40mail.gmail.com >> >> <https://groups.google.com/d/msgid/elixir-lang-core/CAKrSanzzRBYyFCM6T6dFRWoXVPBj-k2jbci2wfEsJa4bnDvL7g%40mail.gmail.com?utm_medium=email&utm_source=footer> >> . >> > -- You received this message because you are subscribed to the Google Groups "elixir-lang-core" group. To unsubscribe from this group and stop receiving emails from it, send an email to elixir-lang-core+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/elixir-lang-core/a9050152-816d-40c1-9845-4d4fdd831285n%40googlegroups.com.