On the Clojure side the only thing you can do is to serialize writes to the resouce (the db connection). So at most one thread can be performing an update operation, at a given time. You can increase concurrency by mantaining "read-only" db connections (most likely in an ad-hoc/by-convention manner) - they can be safely shared across threads.
As Akhil points out, agents are a good option for implementing serialized access. This doesn't scale very well though, particularly across machines. A proper solution must necessarily come from using the transactionality provided by the database (if any - I'm not familiar with Mongo). Finally, it's worth pointing out that implementing queues as database tables/objects is most likely a bad idea. There are better tools for the job. On Wednesday, March 6, 2013 5:36:13 PM UTC+1, bruce li wrote: > > Hello, > I'm working on a piece of code that uses congomongo to access mongodb. > What I want to implement is to use one collection of the DB as a > queue(let's say, it's called "task"). > > Using congomongo, it's easy to fetch a task that is of status :queue : > (def t (fetch-one :task :where {:status :queue})), > and also it's simple to update it: (update! :task t (assoc t :status > :running)) > > But how to make a safe and consistent "dequeue" operation in concurrent > context? A naive dequeue should look like this: > > (let [t (fetch-one:task :where {:status :queue})] > (update! :task t (assoc t :status :running)) > t) > > But taking concurrency into consideration, this implementation is > error-prone, at least from my point of view. Consider this: > > When 2 threads are fetching the task, it's very likely that the task is > fetched twice using the previous piece of code and they would be executed > twice. > > Do we have something like transaction? Something that will enforce the > "fetch-one" and "update!" statement are both executed before another > “fetch-one" operation is adopted? > > What I can think of is to use an agent. And my code looks like this: > > (def db-agent (agent nil)) > (defn dequeue [] > (letfn [(do-dequeue [da] > (let [task (mongo/fetch-one :task :where {:status :queue})] > (when task > (mongo/update! :task feed (assoc task :status :running))) > task))] > (send db-agent do-dequeue) > @db-agent))) > > However, I still doubt the correctness of this solution. Right before the > @db-agent is called, won't another thread call "dequeue", which will > involve another "send” and change the value of db-agent? > > I'm wondering if anyone can help. Thanks. > > > > -- -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/groups/opt_out.