If you are looking for a more idiomatic solution, https://github.com/jpalmucci/clj-yield wraps a lazy sequence around a blocking queue.
On May 30, 2013, at 11:58 AM, Artem Boytsov <aboyt...@gmail.com> wrote: > Hello, Colin, > > I suspected I should turn to existing Java concurrency constructs. Thank you > very much for your response, and this is what I'm going to do. I was just > hoping there's some Clojure idiomatic way to solve this, using agents, > futures, promises, refs, and other Clojure stuff. For example, if there were > a function taking a list of agents and return any one of them which is ready > (vs. all of them), I would be able to implement my example relatively simply. > Just wanted to make sure I'm not missing anything. > > Artem. > > On Thursday, May 30, 2013 2:12:02 AM UTC-7, Colin Yates wrote: > Can you not use > http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html? > That will provide the blocking element. > > To execute N (i.e. 10 in your example) use a > http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html. > The 'glue' would be an infinite loop which .takes from the incoming > sequence (which could also be a LBQ) and then puts it on the thread pool. > > That gets stuff happening in parallel. > > To consume the results of that stuff in a sequence have a(nother) LBQ which > the consumers consume (using the blocking .take) and have the glue code wrap > the function it received from the LBQ in a function which takes the result of > that function and puts it on the sequence. > > This looks like (clojure forgiveness is required): > > [code] > (def incoming-queue (java....LinkedBlockingQueue.)) > (def outgoing-queue (java....LinkedBlockingQueue.)) > (def workers (java... some thread pool/executor.)) > > ; the following would need to reify itself to be a Runnable, not got that far > yet :) > (defn execute [job result-queue] (let [result (job)] (.put result-queue > result))) > > (def stop-loop (atom false)) > (while (not @stop-loop) > (def next (.take incoming-queue)) > (execute next outgoing-queue)) > [/code] > > A few caveats/notes: > - this uses a lot of Java constructs - that is fine. It is perfectly > idiomatic to use the right Clojure or Java constructs. LBQs rock. > - the above won't compile and the 'execute' needs to return a Runnable - not > sure how. > - it ties up a worker thread until the result can be put onto the outgoing > LBQ. If the outgoing LBQ is bounded and you don't have enough consumers then > eventually all the worker threads will be effectively idle until the results > can be consumed. > - if you didn't want to use a ThreadPool then you could update 'executor' to > maintain an (atom) number of currently executing jobs. The glue code is > single threaded so no chance of multiple jobs starting in parallel. The > single threaded 'cost' is fine as it is doing nothing other than moving > things around. > > I am a (Clojure) newbie so be warned! I fully look forward to somebody > providing a much nicer and more idiomatic Clojure implementation :). > > Hope this helps. > > Col > > On Thursday, 30 May 2013 06:19:29 UTC+1, Artem Boytsov wrote: > Hello, folks! > > I'm a relative noob in Clojure especially when it comes to concurrency, so > please forgive my ignorance. I have a processing stage (producer) that feeds > to another one (consumer). The producer has a bunch of items to process and > it's I/O blocking which takes random time, but the order of the items is > insignificant, so ideally they would materialize on the consumer side on the > first come first serve basis. > > I would like to create a blocking lazy sequence I could just give to the > consumer. I know how to create a lazy sequence (lazy-seq), or how to make it > run in background and block on results (seque), but what I can't wrap my head > around is how parallelize the processing the Clojure way. I was considering > kicking off multiple agents, but how can I wait for any one of them to > finish, not all of them (as await does)? I'm not sure but I think the same > goes for futures/promises. I could have multiple agents putting the results > into some shared sequence, but then how do I block on the sequence itself? > > What I'm trying to do can be described in the following way in a silly > imperative pseudo-code: > > workers = new Worker[10] ; initially w.got_data == nil > for each x in source_data: > w = wait_for_any_worker_ready(workers) ; initially all of them are > ready > if (w.got_data) > output.enqueue(w.data) ; the consumer will read > output in a blocking way > w.process(x) ; non-blocking, kicks off in > the background > > Or, another way to describe it, given a seq of integers: > > [ 1, 2, 3, 4 ... ] > > and a simple function with a variable delay: > > (defn process [x] > (Thread/sleep (* 10000 (rand))) > (* 2 x)) > > How can I write a function which would return a blocking lazy sequence of > processed integers, in arbitrary order, parallelizing the processing in up to > 10 threads? > > Thank you! > > Artem. > > -- > -- > You received this message because you are subscribed to the Google > Groups "Clojure" group. > To post to this group, send email to clojure@googlegroups.com > Note that posts from new members are moderated - please be patient with your > first post. > To unsubscribe from this group, send email to > clojure+unsubscr...@googlegroups.com > For more options, visit this group at > http://groups.google.com/group/clojure?hl=en > --- > You received this message because you are subscribed to the Google Groups > "Clojure" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to clojure+unsubscr...@googlegroups.com. > For more options, visit https://groups.google.com/groups/opt_out. > > -- -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/groups/opt_out.