Re: Core.async unordered pipeline-blocking ?

2014-11-29 Thread Stuart Sierra
Here's a version I did that only uses as many threads as necessary to keep up with the input, subject to an upper bound: http://stuartsierra.com/2013/12/08/parallel-processing-with-core-async –S -- You received this message because you are subscribed to the Google Groups Clojure group. To post

Re: Core.async unordered pipeline-blocking ?

2014-11-28 Thread Francis Avila
I had a need for this too some time ago. It's not very hard to write yourself. Whatever trickiness there is in these functions is in handling exceptions and orchestrating shutdown. I put my version up in a gist with some other async utility functions I wrote:

Re: Core.async unordered pipeline-blocking ?

2014-11-28 Thread Niels van Klaveren
Thanks so much for sharing, Francis ! It might be simple to some, but I haven't had an opportunity yet to get acquainted well enough with core.async. Clojure has so much useful libraries, but some force you to get your head around (for me) completely new paradigms which can take time. Reading

Core.async unordered pipeline-blocking ?

2014-11-27 Thread Niels van Klaveren
Recently in creating load testing I've been in need of throttling certain mixed IO/CPU bound processes and have been using claypoole.core/upmap for those situations (require '[com.climate.claypoole :as cp]) (defn wait-and-return [w] (Thread/sleep (* 1000 w)) w) (def to-sort [38 20 22

Re: Core.async unordered pipeline-blocking ?

2014-11-27 Thread Timothy Baldridge
This wasn't included in core.async mostly because it's fairly easy to write. The code looks something like this (untested): (def from-chan) (def to-chan) (dotimes [_ num-threads] (let [rf (fn [_ x] (!! to-chan)) f (xform rf)] (thread (loop [] (when-some [x (!!

Re: Core.async unordered pipeline-blocking ?

2014-11-27 Thread Timothy Baldridge
eh, that one line should be: (let [rf (fn [_ x] (!! to-chan x)) ...) On Thu, Nov 27, 2014 at 3:15 PM, Timothy Baldridge tbaldri...@gmail.com wrote: This wasn't included in core.async mostly because it's fairly easy to write. The code looks something like this (untested): (def from-chan)