If the upstream channel is permanent, then it won't close if all downstream channels are closed. You can create a permanent channel using (permanent-channel) or (channel* :permanent? true). Once you have that, you can replace all that code with a simple (siphon perm-ch conn-ch)
As an aside, though, you could also accomplish the above without delving into functions that are really implementation details. (if-let [evs (seq (channel->seq ch))] evs (run-pipeline (read-channel* :on-timeout ::timeout, timeout POLL-TIMEOUT) (fn [msg] (when-not (= ::timeout msg) (conj (channel->seq ch) msg))))) This is non-blocking, but will require some outer handler to deal with the async-result. Also, in the future you'll probably get a more reliable reply if you use the Aleph mailing list. Zach On Thursday, October 25, 2012 4:09:40 AM UTC-7, Marko Topolnik wrote: > > I use lamina channels in a library that maintains multiple event streams. > The event source is the Asterisk Management Interface, whose raw events are > processed, filtered, collated, and finally pushed into appropriate lamina > channels. > > On the client side of my library I want to expose the event streams as a > web service. My challenge is to implement long polling: a request either > finds events already enqueued, returning immediately; or blocks until an > event occurs (with a timeout), then accumulates any further events for a > short period, then returns the accumulated events. This reduces network > overhead for the typical case of events occuring in short bursts (a single > user action triggers several events). > > I have implemented this behavior in a piece of code that occupies its > thread for the entire duration of the request: > > (require > [lamina.core :as m] > [lamina.core.graph.node :as node] > [lamina.core.channel :as chan]) > > (let [evs (node/drain (chan/emitter-node ch)) > evs (if (seq evs) > evs > (try (let [ev @(m/with-timeout POLL-TIMEOUT (m/read-channel > ch))] > (Thread/sleep EVENT-BURST-PERIOD) > (conj (node/drain (chan/emitter-node ch)) ev)) > (catch TimeoutException _ nil)))] > (vec evs)) > > I would instead like to do this without blocking the thread. Ideally, I'll > use aleph to implement the web service and connect a downstream HTTP > response channel to my library's lamina channel. What I'm missing is, how > do I disconnect the downstream channel without breaking anything in the > upstream channel? I want to cleanly disconnect it, let my channel enqueue > any further events, then later connect another aleph channel, which will > drain those events with no loss. > > -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en