Thanks for that Larry but I think this is a bit of overkill for my scenario. The code I pasted is almost verbatim what we have in our production codebase, so the ability to queue new jobs etc is really not needed. Cheers though.
On Thursday, September 18, 2014 9:38:47 AM UTC+10, larry google groups wrote: > > > > We don't have streams of data here, the long running tasks have > side-effects. I would > > prefer to avoid adding another whole framework just to run a few long > running jobs in p//. > > > I guess I should show you some code, so you can see how simple this is. > I'll copy-and-paste some code that I use. > > One simple way I use Lamina is to save stuff to a database. I don't want > the "save" action happening in my main thread, so I put the data in a > channel, and I let some workers pull that data off the channel and put it > in the database. So what follows is the whole file, this about 30 lines of > code, including some try/catch stuff that you probably don't need: > > (ns loupi.persistence-queue > (:require > [loupi.persistence :as persistence] > [slingshot.slingshot :as ss] > [lamina.core :as lamina])) > > (def ^:private persistence-channel (lamina/channel)) > > (defn persist-this-item [context-wrapper-for-database-call] > (lamina/enqueue persistence-channel > (fn [] > (ss/try+ > (persistence/make-consistent > context-wrapper-for-database-call) > (catch Object o (ss/throw+ {:type > :loupi.supervisor/problem > :message "Error in > persistence-queue/persist-this-itme." > :data o})))))) > > (defn worker [] > (loop [closure-with-item-inside @(lamina/read-channel > persistence-channel)] > (ss/try+ > (closure-with-item-inside) > (catch Object o (ss/throw+ {:type :loupi.supervisor/problem > :message "Error in > persistence-queue/worker." > :closure closure-with-item-inside > :data o}))) > (recur @(lamina/read-channel persistence-channel)))) > > (defn start-workers [] > (dotimes [_ 6] > (println "Starting up the persist queue workers.") > (future (worker)))) > > > > I call (start-workers) when the app starts. When I save something to the > database, I call (persist-this-item) and I put a closure on the channel. > The workers eventually grab that closure and execute it. > > Clearly, that closure can do whatever you like. To borrow from your > original example, that closure is where you would put: > > (long-running-widget-processor widget) > > > > > > > > > On Tuesday, September 16, 2014 10:00:07 PM UTC-4, Beau Fabry wrote: >> >> We don't have streams of data here, the long running tasks have >> side-effects. I would prefer to avoid adding another whole framework just >> to run a few long running jobs in p//. >> >> I have a list of jobs to do, I'm partitioning that list up into 4 sub >> lists to be worked through by 4 p// workers, I then want to block and wait >> until all 4 workers have finished their tasks. >> >> On Wednesday, September 17, 2014 3:27:07 AM UTC+10, larry google groups >> wrote: >>> >>> >>> This does not look correct to me. Perhaps someone else has more insight >>> into this. I am suspicious about 2 things: >>> >>> 1.) your use of doall >>> >>> 2.) your use of (thread) >>> >>> It looks to me like you are trying to hack together a kind of pipeline >>> or channel. Clojure has a wealth of libraries that can handle that for you. >>> The main thing you are trying to do is this: >>> >>> (long-running-widget-processor widget)) >>> >>> >>> You go to some trouble to set up workers, all to ensure that >>> long-running-widget-processor >>> is handled in its own thread. >>> >>> I would suggest you look at Lamina: >>> >>> https://github.com/ztellman/lamina >>> >>> In particular, look at pipelines: >>> >>> https://github.com/ztellman/lamina/wiki/Pipelines >>> >>> >>> >>> >>> >>> On Friday, September 5, 2014 1:46:02 AM UTC-4, Beau Fabry wrote: >>>> >>>> Is the kinda ugly constant (doall usage a sign that I'm doing >>>> something silly? >>>> >>>> (let [num-workers 4 >>>> widgets-per-worker (inc (int (/ (count widgets) num-workers))) >>>> bucketed-widgets (partition-all widgets-per-worker widgets) >>>> workers (doall (map (fn [widgets] >>>> (thread >>>> (doseq [widget widgets] >>>> (long-running-widget-processor widget)) >>>> true)) >>>> bucketed-widgets))] >>>> (doall (map <!! workers))) >>>> >>>> https://gist.github.com/bfabry/ad830b1888e4fc550f88 >>>> >>>> All comments appreciated :-) >>>> >>>> Cheers, >>>> Beau >>>> >>> -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.