Hello, Colin,

I suspected I should turn to existing Java concurrency constructs. Thank 
you very much for your response, and this is what I'm going to do. I was 
just hoping there's some Clojure idiomatic way to solve this, using agents, 
futures, promises, refs, and other Clojure stuff. For example, if there 
were a function taking a list of agents and return any one of them which is 
ready (vs. all of them), I would be able to implement my example relatively 
simply. Just wanted to make sure I'm not missing anything.

Artem.

On Thursday, May 30, 2013 2:12:02 AM UTC-7, Colin Yates wrote:
>
> Can you not use 
> http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html?
>  
>  That will provide the blocking element.  
>
> To execute N (i.e. 10 in your example) use a 
> http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html.
>  
>  The 'glue' would be an infinite loop which .takes from the incoming 
> sequence (which could also be a LBQ) and then puts it on the thread pool.
>
> That gets stuff happening in parallel.  
>
> To consume the results of that stuff in a sequence have a(nother) LBQ 
> which the consumers consume (using the blocking .take) and have the glue 
> code wrap the function it received from the LBQ in a function which takes 
> the result of that function and puts it on the sequence.  
>
> This looks like (clojure forgiveness is required):
>
> [code]
> (def incoming-queue (java....LinkedBlockingQueue.))
> (def outgoing-queue (java....LinkedBlockingQueue.))
> (def workers (java... some thread pool/executor.))
>
> ; the following would need to reify itself to be a Runnable, not got that 
> far yet :)
> (defn execute [job result-queue] (let [result (job)] (.put result-queue 
> result)))
>
> (def stop-loop (atom false))
> (while (not @stop-loop)
>   (def next (.take incoming-queue))
>   (execute next outgoing-queue))
> [/code]
>
> A few caveats/notes:
>  - this uses a lot of Java constructs - that is fine.  It is perfectly 
> idiomatic to use the right Clojure or Java constructs.  LBQs rock.
>  - the above won't compile and the 'execute' needs to return a Runnable - 
> not sure how.
>  - it ties up a worker thread until the result can be put onto the 
> outgoing LBQ.  If the outgoing LBQ is bounded and you don't have enough 
> consumers then eventually all the worker threads will be effectively idle 
> until the results can be consumed.  
>  - if you didn't want to use a ThreadPool then you could update 'executor' 
> to maintain an (atom) number of currently executing jobs.  The glue code is 
> single threaded so no chance of multiple jobs starting in parallel.  The 
> single threaded 'cost' is fine as it is doing nothing other than moving 
> things around.
>
> I am a (Clojure) newbie so be warned!  I fully look forward to somebody 
> providing a much nicer and more idiomatic Clojure implementation :).
>
> Hope this helps.
>
> Col
>
> On Thursday, 30 May 2013 06:19:29 UTC+1, Artem Boytsov wrote:
>>
>> Hello, folks!
>>
>> I'm a relative noob in Clojure especially when it comes to concurrency, 
>> so please forgive my ignorance. I have a processing stage (producer) that 
>> feeds to another one (consumer). The producer has a bunch of items to 
>> process and it's I/O blocking which takes random time, but the order of the 
>> items is insignificant, so ideally they would materialize on the consumer 
>> side on the first come first serve basis.
>>
>> I would like to create a blocking lazy sequence I could just give to the 
>> consumer. I know how to create a lazy sequence (lazy-seq), or how to make 
>> it run in background and block on results (seque), but what I can't wrap my 
>> head around is how parallelize the processing the Clojure way. I was 
>> considering kicking off multiple agents, but how can I wait for *any one 
>> *of them to finish, not all of them (as await does)? I'm not sure but I 
>> think the same goes for futures/promises. I could have multiple agents 
>> putting the results into some shared sequence, but then how do I block on 
>> the sequence itself?
>>
>> What I'm trying to do can be described in the following way in a silly 
>> imperative pseudo-code:
>>
>> workers = new Worker[10]                       ; initially w.got_data == 
>> nil 
>> for each x in source_data:
>>    w = wait_for_any_worker_ready(workers)      ; initially all of them 
>> are ready
>>    if (w.got_data) 
>>      output.enqueue(w.data)                    ; the consumer will read 
>> output in a blocking way
>>      w.process(x)                              ; non-blocking, kicks off 
>> in the background
>>
>> Or, another way to describe it, given a seq of integers:
>>
>> [ 1, 2, 3, 4 ... ]
>>
>> and a simple function with a variable delay:
>>
>> (defn process [x]
>>    (Thread/sleep (* 10000 (rand)))
>>    (* 2 x))
>>
>> How can I write a function which would return a blocking lazy sequence of 
>> processed integers, in arbitrary order, parallelizing the processing in up 
>> to 10 threads?
>>
>> Thank you!
>>
>> Artem.
>>
>

-- 
-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.


Reply via email to