This wasn't included in core.async mostly because it's fairly easy to write. The code looks something like this (untested):
(def from-chan) (def to-chan) (dotimes [_ num-threads] (let [rf (fn [_ x] (>!! to-chan)) f (xform rf)] (thread (loop [] (when-some [x (<!! from-chan)] (f x)))))) I'll leave it up to you to decide when to terminate or how to shut things down. Timothy Baldridge On Thu, Nov 27, 2014 at 2:32 PM, Niels van Klaveren < niels.vanklave...@gmail.com> wrote: > Recently in creating load testing I've been in need of throttling certain > mixed IO/CPU bound processes and have been using claypoole.core/upmap for > those situations > > (require '[com.climate.claypoole :as cp]) > > (defn wait-and-return > [w] > (Thread/sleep (* 1000 w)) > w) > > (def to-sort > [38 20 22 24 36 2 30 18 32 0 4 34 14 28 6 16 12 26 8 10]) > > (def timesorted > (time (doall (cp/upmap 20 wait-and-return to-sort)))) > > timesorted > "Elapsed time: 38004.512729 msecs" > => (var clay.core/timesorted) > => (0 2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38) > > > This is just an example to show blocking processes returning in a > different order, while being restricted to a certain number of threads. I > know this won't work with a threadpool smaller than to-sort. > > Other parts of these tests would benefit from core.async techniques, but I > haven't found a satisfactory combination of the two. What I'm really > looking for is a way to use core.async pipeline-blocking syntax, which > takes a fixed number of parallel processes, a from channel, a transducer > and a to channel, but returns the results from the transducers unordered > (ie. fasted delivered first. Something like this, but with an ordered > outcome. > > (require '[clojure.core.async :as a]) > > (def to-sort > (a/to-chan [38 20 22 24 36 2 30 18 32 0 4 34 14 28 6 16 12 26 8 10])) > > (defn wait-and-return > [w] > (Thread/sleep (* 1000 w)) > w) > > (def sorted > (a/chan)) > > (def xwait > (map wait-and-return)) > > (def sorter > (a/pipeline-blocking 20 sorted xwait to-sort)) > > (time (a/<!! (a/into [] sorted))) > > > Is there a function like that, or would there be a recommended way to do > something like this in core.async ? > > Regards, > > Niels > > -- > You received this message because you are subscribed to the Google > Groups "Clojure" group. > To post to this group, send email to clojure@googlegroups.com > Note that posts from new members are moderated - please be patient with > your first post. > To unsubscribe from this group, send email to > clojure+unsubscr...@googlegroups.com > For more options, visit this group at > http://groups.google.com/group/clojure?hl=en > --- > You received this message because you are subscribed to the Google Groups > "Clojure" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to clojure+unsubscr...@googlegroups.com. > For more options, visit https://groups.google.com/d/optout. > -- “One of the main causes of the fall of the Roman Empire was that–lacking zero–they had no way to indicate successful termination of their C programs.” (Robert Firth) -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.