Hello, Colin, I suspected I should turn to existing Java concurrency constructs. Thank you very much for your response, and this is what I'm going to do. I was just hoping there's some Clojure idiomatic way to solve this, using agents, futures, promises, refs, and other Clojure stuff. For example, if there were a function taking a list of agents and return any one of them which is ready (vs. all of them), I would be able to implement my example relatively simply. Just wanted to make sure I'm not missing anything.
Artem. On Thursday, May 30, 2013 2:12:02 AM UTC-7, Colin Yates wrote: > > Can you not use > http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html? > > That will provide the blocking element. > > To execute N (i.e. 10 in your example) use a > http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html. > > The 'glue' would be an infinite loop which .takes from the incoming > sequence (which could also be a LBQ) and then puts it on the thread pool. > > That gets stuff happening in parallel. > > To consume the results of that stuff in a sequence have a(nother) LBQ > which the consumers consume (using the blocking .take) and have the glue > code wrap the function it received from the LBQ in a function which takes > the result of that function and puts it on the sequence. > > This looks like (clojure forgiveness is required): > > [code] > (def incoming-queue (java....LinkedBlockingQueue.)) > (def outgoing-queue (java....LinkedBlockingQueue.)) > (def workers (java... some thread pool/executor.)) > > ; the following would need to reify itself to be a Runnable, not got that > far yet :) > (defn execute [job result-queue] (let [result (job)] (.put result-queue > result))) > > (def stop-loop (atom false)) > (while (not @stop-loop) > (def next (.take incoming-queue)) > (execute next outgoing-queue)) > [/code] > > A few caveats/notes: > - this uses a lot of Java constructs - that is fine. It is perfectly > idiomatic to use the right Clojure or Java constructs. LBQs rock. > - the above won't compile and the 'execute' needs to return a Runnable - > not sure how. > - it ties up a worker thread until the result can be put onto the > outgoing LBQ. If the outgoing LBQ is bounded and you don't have enough > consumers then eventually all the worker threads will be effectively idle > until the results can be consumed. > - if you didn't want to use a ThreadPool then you could update 'executor' > to maintain an (atom) number of currently executing jobs. The glue code is > single threaded so no chance of multiple jobs starting in parallel. The > single threaded 'cost' is fine as it is doing nothing other than moving > things around. > > I am a (Clojure) newbie so be warned! I fully look forward to somebody > providing a much nicer and more idiomatic Clojure implementation :). > > Hope this helps. > > Col > > On Thursday, 30 May 2013 06:19:29 UTC+1, Artem Boytsov wrote: >> >> Hello, folks! >> >> I'm a relative noob in Clojure especially when it comes to concurrency, >> so please forgive my ignorance. I have a processing stage (producer) that >> feeds to another one (consumer). The producer has a bunch of items to >> process and it's I/O blocking which takes random time, but the order of the >> items is insignificant, so ideally they would materialize on the consumer >> side on the first come first serve basis. >> >> I would like to create a blocking lazy sequence I could just give to the >> consumer. I know how to create a lazy sequence (lazy-seq), or how to make >> it run in background and block on results (seque), but what I can't wrap my >> head around is how parallelize the processing the Clojure way. I was >> considering kicking off multiple agents, but how can I wait for *any one >> *of them to finish, not all of them (as await does)? I'm not sure but I >> think the same goes for futures/promises. I could have multiple agents >> putting the results into some shared sequence, but then how do I block on >> the sequence itself? >> >> What I'm trying to do can be described in the following way in a silly >> imperative pseudo-code: >> >> workers = new Worker[10] ; initially w.got_data == >> nil >> for each x in source_data: >> w = wait_for_any_worker_ready(workers) ; initially all of them >> are ready >> if (w.got_data) >> output.enqueue(w.data) ; the consumer will read >> output in a blocking way >> w.process(x) ; non-blocking, kicks off >> in the background >> >> Or, another way to describe it, given a seq of integers: >> >> [ 1, 2, 3, 4 ... ] >> >> and a simple function with a variable delay: >> >> (defn process [x] >> (Thread/sleep (* 10000 (rand))) >> (* 2 x)) >> >> How can I write a function which would return a blocking lazy sequence of >> processed integers, in arbitrary order, parallelizing the processing in up >> to 10 threads? >> >> Thank you! >> >> Artem. >> > -- -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/groups/opt_out.