If the upstream channel is permanent, then it won't close if all downstream 
channels are closed.  You can create a permanent channel using 
(permanent-channel) or (channel* :permanent? true).  Once you have that, 
you can replace all that code with a simple (siphon perm-ch conn-ch)

As an aside, though, you could also accomplish the above without delving 
into functions that are really implementation details.

  (if-let [evs (seq (channel->seq ch))]
    evs
    (run-pipeline (read-channel* :on-timeout ::timeout, timeout 
POLL-TIMEOUT)
      (fn [msg]
        (when-not (= ::timeout msg)
          (conj (channel->seq ch) msg)))))

This is non-blocking, but will require some outer handler to deal with the 
async-result.

Also, in the future you'll probably get a more reliable reply if you use 
the Aleph mailing list.

Zach
  

On Thursday, October 25, 2012 4:09:40 AM UTC-7, Marko Topolnik wrote:
>
> I use lamina channels in a library that maintains multiple event streams. 
> The event source is the Asterisk Management Interface, whose raw events are 
> processed, filtered, collated, and finally pushed into appropriate lamina 
> channels.
>
> On the client side of my library I want to expose the event streams as a 
> web service. My challenge is to implement long polling: a request either 
> finds events already enqueued, returning immediately; or blocks until an 
> event occurs (with a timeout), then accumulates any further events for a 
> short period, then returns the accumulated events. This reduces network 
> overhead for the typical case of events occuring in short bursts (a single 
> user action triggers several events).
>
> I have implemented this behavior in a piece of code that occupies its 
> thread for the entire duration of the request:
>
> (require 
>       [lamina.core :as m] 
>       [lamina.core.graph.node :as node] 
>       [lamina.core.channel :as chan])
>
> (let [evs (node/drain (chan/emitter-node ch))
>       evs (if (seq evs)
>             evs
>             (try (let [ev @(m/with-timeout POLL-TIMEOUT (m/read-channel 
> ch))]
>                    (Thread/sleep EVENT-BURST-PERIOD)
>                    (conj (node/drain (chan/emitter-node ch)) ev))
>                  (catch TimeoutException _ nil)))]
>   (vec evs))
>
> I would instead like to do this without blocking the thread. Ideally, I'll 
> use aleph to implement the web service and connect a downstream HTTP 
> response channel to my library's lamina channel. What I'm missing is, how 
> do I disconnect the downstream channel without breaking anything in the 
> upstream channel? I want to cleanly disconnect it, let my channel enqueue 
> any further events, then later connect another aleph channel, which will 
> drain those events with no loss.
>
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en

Reply via email to