On Thursday, September 3, 2015 at 9:15:54 AM UTC-4, Andy- wrote:
>
> Only to answer the "retry on error" part of you question: You might like 
> hara/event:
> http://docs.caudate.me/hara/hara-event.html
>
Thanks for the tip! I've been meaning to check out hara anyway. If I end up 
using it I'll post a follow-up.

>
> HTH
>
> On Wednesday, September 2, 2015 at 8:44:07 PM UTC-4, jo...@signafire.com 
> wrote:
>>
>> TLDR: how do you use Component when the application logic involves 
>> retrying failed components?
>>
>> Background:
>> I'm writing an app that consumes events from a streaming HTTP connection 
>> and writes those events to a message queue (see Code Illustration #1). It 
>> seems like that could be captured easily with three components —an HTTP 
>> stream, a message queue connection, and a "shoveler" that depends on the 
>> other two (see Code Illustration #2)— *but* the reconnection 
>> requirements complicate things…
>> The HTTP connection may be closed at any time by the server; if that 
>> happens, the app should persistently attempt to reconnect using an 
>> exponential back-off pattern. In addition, if the app goes thirty seconds 
>> without receiving any data, it should close the connection and try to 
>> reconnect. (see Code Illustration #3) It's not clear to me how to best 
>> express these "retry" requirements in the component lifecycle. Like, is it 
>> blasphemous for a component to be calling stop and start on its injected 
>> dependencies?
>>
>> Some possible approaches:
>>
>>    - Throw different kinds of exceptions to indicate what should happen 
>>    (using namespaced keywords, perhaps?), handled by whoever calls 
>>    component/start on the system-map.
>>    - The exception provides the component and system at the time of the 
>>       exception, enabling a sort of "resume" capability.
>>       - I'm under the impression that relying on exceptions for control 
>>       flow is an anti-pattern.
>>    - Create a sort of custom system implementation, one that goes beyond 
>>    calling start on its components in dependency order to monitor 
>>    failures and direct retries "appropriately".
>>       - "A system is a component which knows how to start and stop other 
>>       components." (from the README)
>>       So the fact that we want the shoveler component to be capable of 
>>       restarting the HTTP component indicates that the shoveler should 
>> actually 
>>       be considered a system. (right?)
>>          - If the HTTP stream is *injected* into the shoveler as a 
>>          dependency, how is it possible for the shoveler to stop the 
>>          HTTP stream and then start it again *with* any dependencies the 
>>          stream may have?
>>       - Ensure that every component/Lifecycle method implementation is 
>>    idempotent, so that I can get good-enough "restart" semantics by just 
>>    calling start-system again.
>>       - I know that idempotence is generally a Good Thing anyway, but 
>>       using start-system as a panacea strikes me as crude.
>>    
>>
>> Code Illustrations:
>>
>> 1. Rough sketch of app without timeout/retry logic or component:
>> (defn -main []
>>   (let [mq-conn (connect-to-queue mq-config)
>>         {event-stream :body} (http/get endpoint {:as :stream})]
>>     (with-open [rdr (java.io/reader event-stream)]
>>       (doseq [entity (line-seq rdr)]
>>         (write mq-conn entity)))))
>>
>> 2. Rough sketch of app with component but still without timeout/retry 
>> logic:
>> (defrecord EventStream [endpoint
>>                         http-config
>>                         stream]
>>   component/Lifecycle
>>   (start [this]
>>     (let [response (http/get endpoint (merge {:as :stream} http-config))]
>>       (assoc this :http-response response, :stream (:body response)))
>>   (stop [this]
>>     (.close stream)
>>     (-> this (dissoc :http-response) (assoc :stream nil))))
>>
>> (defrecord MessageQueue [config
>>                          connection]
>>   component/Lifecycle
>>   (start [this]
>>     (assoc this :connection (connect-to-queue config)))
>>   (stop [this]
>>     (.close connection)
>>     (assoc this :connection nil)))
>>
>> (defrecord Shoveler [source sink
>>                      worker]
>>   component/Lifecycle
>>   (start [this]
>>     ;; To avoid blocking indefinitely, we put the processing in a future.
>>     (assoc this :worker (future
>>                          (with-open [rdr (java.io/reader (:stream source
>> )]
>>                            (doseq [entity (line-seq rdr)]
>>                              (write sink entity)))))
>>   (stop [this]
>>     (future-cancel worker)
>>     (assoc this :worker nil)))
>>
>> (defn -main []
>>   (-> (component/system-map :config (read-config)
>>                             :events (map->EventStream {:endpoint endpoint
>> })
>>                             :mq-client (map->MessageQueue {})
>>                             :shoveler (map->Shoveler {}))
>>       (component/using {:events {:http-config :config}
>>                         :mq-client {:config :config}
>>                         :shoveler {:source :events
>>                                    :sink :mq-client}})
>>       component/start))
>>
>> 3. Rough sketch of desired *production* behavior, not using Component:
>> (defn -main []
>>   (let [mq-conn (connect-to-queue mq-config)]
>>     (while true  ; ideally, the app is *always* ready to receive 
>> incoming events & put them into the queue
>>       (try
>>         (let [{event-stream :body} (loop [conn-timeout 1000]
>>                                      (try
>>                                        (http/get endpoint
>>                                                  {:as :stream
>>                                                   :conn-timeout conn-
>> timeout
>>                                                   :socket-timeout 30000})
>>                                        (catch java.net.
>> SocketTimeoutException e
>>                                          (if (> conn-timeout 32000)  ; an 
>> upper limit. 32 seconds was arbitrarily chosen
>>                                            (throw (
>> SomeAppropriateException. "Service unavailable. Human attention needed." 
>> e))
>>                                            (recur (* 2 conn-timeout))  ; 
>> I know that you can't actually `recur` inside of a `catch`, but I think 
>> it's the clearest way to present this pseudocode
>>   
>> ...
>
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to