Re: Core.async unordered pipeline-blocking ?
Here's a version I did that only uses as many threads as necessary to keep up with the input, subject to an upper bound: http://stuartsierra.com/2013/12/08/parallel-processing-with-core-async –S -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Re: Core.async unordered pipeline-blocking ?
I had a need for this too some time ago. It's not very hard to write yourself. Whatever trickiness there is in these functions is in handling exceptions and orchestrating shutdown. I put my version up in a gist with some other async utility functions I wrote: https://gist.github.com/favila/8e7ad6ea5b01bd7466ff You are looking for fast-pipeline-blocking. On Thursday, November 27, 2014 3:32:43 PM UTC-6, Niels van Klaveren wrote: Recently in creating load testing I've been in need of throttling certain mixed IO/CPU bound processes and have been using claypoole.core/upmap for those situations (require '[com.climate.claypoole :as cp]) (defn wait-and-return [w] (Thread/sleep (* 1000 w)) w) (def to-sort [38 20 22 24 36 2 30 18 32 0 4 34 14 28 6 16 12 26 8 10]) (def timesorted (time (doall (cp/upmap 20 wait-and-return to-sort timesorted Elapsed time: 38004.512729 msecs = (var clay.core/timesorted) = (0 2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38) This is just an example to show blocking processes returning in a different order, while being restricted to a certain number of threads. I know this won't work with a threadpool smaller than to-sort. Other parts of these tests would benefit from core.async techniques, but I haven't found a satisfactory combination of the two. What I'm really looking for is a way to use core.async pipeline-blocking syntax, which takes a fixed number of parallel processes, a from channel, a transducer and a to channel, but returns the results from the transducers unordered (ie. fasted delivered first. Something like this, but with an ordered outcome. (require '[clojure.core.async :as a]) (def to-sort (a/to-chan [38 20 22 24 36 2 30 18 32 0 4 34 14 28 6 16 12 26 8 10])) (defn wait-and-return [w] (Thread/sleep (* 1000 w)) w) (def sorted (a/chan)) (def xwait (map wait-and-return)) (def sorter (a/pipeline-blocking 20 sorted xwait to-sort)) (time (a/!! (a/into [] sorted))) Is there a function like that, or would there be a recommended way to do something like this in core.async ? Regards, Niels -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Re: Core.async unordered pipeline-blocking ?
Thanks so much for sharing, Francis ! It might be simple to some, but I haven't had an opportunity yet to get acquainted well enough with core.async. Clojure has so much useful libraries, but some force you to get your head around (for me) completely new paradigms which can take time. Reading your code did provide me a with lot of new insights, as well as having a very useful tool I can use. All time I can devote to learning Clojure have been in projects at work I have to do by myself, often needing to be put together on short notice. There's a snowball's chance in hell of collaboration with my colleagues from the firmly Java entrenched development department. I need to have a solid idea how I can get from start to finish in a very short time, for which Clojure has been quite awesome in some ways, but quite hard in others. The awesome community is often just the push in the back to get over these hurdles though. Now I have all parts in hand I can start improving some more mature projects by introducing core.async into them. Wiring up existing pieces will get me more experience and confidence to dive deeper into async libraries. Thanks again for the last part of the puzzle, I'm pretty sure I will be able to finish putting it together now ! On Friday, November 28, 2014 6:10:05 PM UTC+1, Francis Avila wrote: I had a need for this too some time ago. It's not very hard to write yourself. Whatever trickiness there is in these functions is in handling exceptions and orchestrating shutdown. I put my version up in a gist with some other async utility functions I wrote: https://gist.github.com/favila/8e7ad6ea5b01bd7466ff You are looking for fast-pipeline-blocking. On Thursday, November 27, 2014 3:32:43 PM UTC-6, Niels van Klaveren wrote: Recently in creating load testing I've been in need of throttling certain mixed IO/CPU bound processes and have been using claypoole.core/upmap for those situations (require '[com.climate.claypoole :as cp]) (defn wait-and-return [w] (Thread/sleep (* 1000 w)) w) (def to-sort [38 20 22 24 36 2 30 18 32 0 4 34 14 28 6 16 12 26 8 10]) (def timesorted (time (doall (cp/upmap 20 wait-and-return to-sort timesorted Elapsed time: 38004.512729 msecs = (var clay.core/timesorted) = (0 2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38) This is just an example to show blocking processes returning in a different order, while being restricted to a certain number of threads. I know this won't work with a threadpool smaller than to-sort. Other parts of these tests would benefit from core.async techniques, but I haven't found a satisfactory combination of the two. What I'm really looking for is a way to use core.async pipeline-blocking syntax, which takes a fixed number of parallel processes, a from channel, a transducer and a to channel, but returns the results from the transducers unordered (ie. fasted delivered first. Something like this, but with an ordered outcome. (require '[clojure.core.async :as a]) (def to-sort (a/to-chan [38 20 22 24 36 2 30 18 32 0 4 34 14 28 6 16 12 26 8 10])) (defn wait-and-return [w] (Thread/sleep (* 1000 w)) w) (def sorted (a/chan)) (def xwait (map wait-and-return)) (def sorter (a/pipeline-blocking 20 sorted xwait to-sort)) (time (a/!! (a/into [] sorted))) Is there a function like that, or would there be a recommended way to do something like this in core.async ? Regards, Niels -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Core.async unordered pipeline-blocking ?
Recently in creating load testing I've been in need of throttling certain mixed IO/CPU bound processes and have been using claypoole.core/upmap for those situations (require '[com.climate.claypoole :as cp]) (defn wait-and-return [w] (Thread/sleep (* 1000 w)) w) (def to-sort [38 20 22 24 36 2 30 18 32 0 4 34 14 28 6 16 12 26 8 10]) (def timesorted (time (doall (cp/upmap 20 wait-and-return to-sort timesorted Elapsed time: 38004.512729 msecs = (var clay.core/timesorted) = (0 2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38) This is just an example to show blocking processes returning in a different order, while being restricted to a certain number of threads. I know this won't work with a threadpool smaller than to-sort. Other parts of these tests would benefit from core.async techniques, but I haven't found a satisfactory combination of the two. What I'm really looking for is a way to use core.async pipeline-blocking syntax, which takes a fixed number of parallel processes, a from channel, a transducer and a to channel, but returns the results from the transducers unordered (ie. fasted delivered first. Something like this, but with an ordered outcome. (require '[clojure.core.async :as a]) (def to-sort (a/to-chan [38 20 22 24 36 2 30 18 32 0 4 34 14 28 6 16 12 26 8 10])) (defn wait-and-return [w] (Thread/sleep (* 1000 w)) w) (def sorted (a/chan)) (def xwait (map wait-and-return)) (def sorter (a/pipeline-blocking 20 sorted xwait to-sort)) (time (a/!! (a/into [] sorted))) Is there a function like that, or would there be a recommended way to do something like this in core.async ? Regards, Niels -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Re: Core.async unordered pipeline-blocking ?
This wasn't included in core.async mostly because it's fairly easy to write. The code looks something like this (untested): (def from-chan) (def to-chan) (dotimes [_ num-threads] (let [rf (fn [_ x] (!! to-chan)) f (xform rf)] (thread (loop [] (when-some [x (!! from-chan)] (f x)) I'll leave it up to you to decide when to terminate or how to shut things down. Timothy Baldridge On Thu, Nov 27, 2014 at 2:32 PM, Niels van Klaveren niels.vanklave...@gmail.com wrote: Recently in creating load testing I've been in need of throttling certain mixed IO/CPU bound processes and have been using claypoole.core/upmap for those situations (require '[com.climate.claypoole :as cp]) (defn wait-and-return [w] (Thread/sleep (* 1000 w)) w) (def to-sort [38 20 22 24 36 2 30 18 32 0 4 34 14 28 6 16 12 26 8 10]) (def timesorted (time (doall (cp/upmap 20 wait-and-return to-sort timesorted Elapsed time: 38004.512729 msecs = (var clay.core/timesorted) = (0 2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38) This is just an example to show blocking processes returning in a different order, while being restricted to a certain number of threads. I know this won't work with a threadpool smaller than to-sort. Other parts of these tests would benefit from core.async techniques, but I haven't found a satisfactory combination of the two. What I'm really looking for is a way to use core.async pipeline-blocking syntax, which takes a fixed number of parallel processes, a from channel, a transducer and a to channel, but returns the results from the transducers unordered (ie. fasted delivered first. Something like this, but with an ordered outcome. (require '[clojure.core.async :as a]) (def to-sort (a/to-chan [38 20 22 24 36 2 30 18 32 0 4 34 14 28 6 16 12 26 8 10])) (defn wait-and-return [w] (Thread/sleep (* 1000 w)) w) (def sorted (a/chan)) (def xwait (map wait-and-return)) (def sorter (a/pipeline-blocking 20 sorted xwait to-sort)) (time (a/!! (a/into [] sorted))) Is there a function like that, or would there be a recommended way to do something like this in core.async ? Regards, Niels -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout. -- “One of the main causes of the fall of the Roman Empire was that–lacking zero–they had no way to indicate successful termination of their C programs.” (Robert Firth) -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.
Re: Core.async unordered pipeline-blocking ?
eh, that one line should be: (let [rf (fn [_ x] (!! to-chan x)) ...) On Thu, Nov 27, 2014 at 3:15 PM, Timothy Baldridge tbaldri...@gmail.com wrote: This wasn't included in core.async mostly because it's fairly easy to write. The code looks something like this (untested): (def from-chan) (def to-chan) (dotimes [_ num-threads] (let [rf (fn [_ x] (!! to-chan)) f (xform rf)] (thread (loop [] (when-some [x (!! from-chan)] (f x)) I'll leave it up to you to decide when to terminate or how to shut things down. Timothy Baldridge On Thu, Nov 27, 2014 at 2:32 PM, Niels van Klaveren niels.vanklave...@gmail.com wrote: Recently in creating load testing I've been in need of throttling certain mixed IO/CPU bound processes and have been using claypoole.core/upmap for those situations (require '[com.climate.claypoole :as cp]) (defn wait-and-return [w] (Thread/sleep (* 1000 w)) w) (def to-sort [38 20 22 24 36 2 30 18 32 0 4 34 14 28 6 16 12 26 8 10]) (def timesorted (time (doall (cp/upmap 20 wait-and-return to-sort timesorted Elapsed time: 38004.512729 msecs = (var clay.core/timesorted) = (0 2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38) This is just an example to show blocking processes returning in a different order, while being restricted to a certain number of threads. I know this won't work with a threadpool smaller than to-sort. Other parts of these tests would benefit from core.async techniques, but I haven't found a satisfactory combination of the two. What I'm really looking for is a way to use core.async pipeline-blocking syntax, which takes a fixed number of parallel processes, a from channel, a transducer and a to channel, but returns the results from the transducers unordered (ie. fasted delivered first. Something like this, but with an ordered outcome. (require '[clojure.core.async :as a]) (def to-sort (a/to-chan [38 20 22 24 36 2 30 18 32 0 4 34 14 28 6 16 12 26 8 10])) (defn wait-and-return [w] (Thread/sleep (* 1000 w)) w) (def sorted (a/chan)) (def xwait (map wait-and-return)) (def sorter (a/pipeline-blocking 20 sorted xwait to-sort)) (time (a/!! (a/into [] sorted))) Is there a function like that, or would there be a recommended way to do something like this in core.async ? Regards, Niels -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout. -- “One of the main causes of the fall of the Roman Empire was that–lacking zero–they had no way to indicate successful termination of their C programs.” (Robert Firth) -- “One of the main causes of the fall of the Roman Empire was that–lacking zero–they had no way to indicate successful termination of their C programs.” (Robert Firth) -- You received this message because you are subscribed to the Google Groups Clojure group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups Clojure group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.