I had a need for this too some time ago. It's not very hard to write 
yourself. Whatever trickiness there is in these functions is in handling 
exceptions and orchestrating shutdown.

I put my version up in a gist with some other async utility functions I 
wrote: https://gist.github.com/favila/8e7ad6ea5b01bd7466ff
You are looking for fast-pipeline-blocking.

On Thursday, November 27, 2014 3:32:43 PM UTC-6, Niels van Klaveren wrote:
>
> Recently in creating load testing I've been in need of throttling certain 
> mixed IO/CPU bound processes and have been using claypoole.core/upmap for 
> those situations
>
> (require '[com.climate.claypoole :as cp])
>
> (defn wait-and-return
>   [w]
>   (Thread/sleep (* 1000 w))
>   w)
>
> (def to-sort
>   [38 20 22 24 36 2 30 18 32 0 4 34 14 28 6 16 12 26 8 10])
>
> (def timesorted
>   (time (doall (cp/upmap 20 wait-and-return to-sort))))
>
> timesorted
> "Elapsed time: 38004.512729 msecs"
> => (var clay.core/timesorted)
> => (0 2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38)
>
>
> This is just an example to show blocking processes returning in a 
> different order, while being restricted to a certain number of threads. I 
> know this won't work with a threadpool smaller than to-sort.
>
> Other parts of these tests would benefit from core.async techniques, but I 
> haven't found a satisfactory combination of the two. What I'm really 
> looking for is a way to use core.async pipeline-blocking syntax, which 
> takes a fixed number of parallel processes, a from channel, a transducer 
> and a to channel, but returns the results from the transducers unordered 
> (ie. fasted delivered first. Something like this, but with an ordered 
> outcome.
>
> (require '[clojure.core.async :as a])
>
> (def to-sort
>   (a/to-chan [38 20 22 24 36 2 30 18 32 0 4 34 14 28 6 16 12 26 8 10]))
>
> (defn wait-and-return
>   [w]
>   (Thread/sleep (* 1000 w))
>   w)
>
> (def sorted
>   (a/chan))
>
> (def xwait
>   (map wait-and-return))
>
> (def sorter
>   (a/pipeline-blocking 20 sorted xwait to-sort))
>
> (time (a/<!! (a/into [] sorted)))
>
>
> Is there a function like that, or would there be a recommended way to do 
> something like this in core.async ?
>
> Regards,
>
> Niels
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to