Another way I can think of decomposing this is to buffer/queue
communication between your two components, i.e a core.async channel. This
will decouple the two components allowing your MessageQueue to manage it's
own reconnection.

Interesting question about whether `start` calling `stop` is blasphemy or
not. I hope someone else can provide some insight on that.


On Wed, Sep 2, 2015 at 4:03 PM, <j...@signafire.com> wrote:

> TLDR: how do you use Component when the application logic involves
> retrying failed components?
>
> Background:
> I'm writing an app that consumes events from a streaming HTTP connection
> and writes those events to a message queue (see Code Illustration #1). It
> seems like that could be captured easily with three components —an HTTP
> stream, a message queue connection, and a "shoveler" that depends on the
> other two (see Code Illustration #2)— *but* the reconnection requirements
> complicate things…
> The HTTP connection may be closed at any time by the server; if that
> happens, the app should persistently attempt to reconnect using an
> exponential back-off pattern. In addition, if the app goes thirty seconds
> without receiving any data, it should close the connection and try to
> reconnect. (see Code Illustration #3) It's not clear to me how to best
> express these "retry" requirements in the component lifecycle. Like, is it
> blasphemous for a component to be calling stop and start on its injected
> dependencies?
>
> Some possible approaches:
>
>    - Throw different kinds of exceptions to indicate what should happen
>    (using namespaced keywords, perhaps?), handled by whoever calls
>    component/start on the system-map.
>    - The exception provides the component and system at the time of the
>       exception, enabling a sort of "resume" capability.
>       - I'm under the impression that relying on exceptions for control
>       flow is an anti-pattern.
>    - Create a sort of custom system implementation, one that goes beyond
>    calling start on its components in dependency order to monitor
>    failures and direct retries "appropriately".
>       - "A system is a component which knows how to start and stop other
>       components." (from the README)
>       So the fact that we want the shoveler component to be capable of
>       restarting the HTTP component indicates that the shoveler should 
> actually
>       be considered a system. (right?)
>          - If the HTTP stream is *injected* into the shoveler as a
>          dependency, how is it possible for the shoveler to stop the HTTP
>          stream and then start it again *with* any dependencies the
>          stream may have?
>       - Ensure that every component/Lifecycle method implementation is
>    idempotent, so that I can get good-enough "restart" semantics by just
>    calling start-system again.
>       - I know that idempotence is generally a Good Thing anyway, but
>       using start-system as a panacea strikes me as crude.
>
>
> Code Illustrations:
>
> 1. Rough sketch of app without timeout/retry logic or component:
> (defn -main []
>   (let [mq-conn (connect-to-queue mq-config)
>         {event-stream :body} (http/get endpoint {:as :stream})]
>     (with-open [rdr (java.io/reader event-stream)]
>       (doseq [entity (line-seq rdr)]
>         (write mq-conn entity)))))
>
> 2. Rough sketch of app with component but still without timeout/retry
> logic:
> (defrecord EventStream [endpoint
>                         http-config
>                         stream]
>   component/Lifecycle
>   (start [this]
>     (let [response (http/get endpoint (merge {:as :stream} http-config))]
>       (assoc this :http-response response, :stream (:body response)))
>   (stop [this]
>     (.close stream)
>     (-> this (dissoc :http-response) (assoc :stream nil))))
>
> (defrecord MessageQueue [config
>                          connection]
>   component/Lifecycle
>   (start [this]
>     (assoc this :connection (connect-to-queue config)))
>   (stop [this]
>     (.close connection)
>     (assoc this :connection nil)))
>
> (defrecord Shoveler [source sink
>                      worker]
>   component/Lifecycle
>   (start [this]
>     ;; To avoid blocking indefinitely, we put the processing in a future.
>     (assoc this :worker (future
>                          (with-open [rdr (java.io/reader (:stream source)]
>                            (doseq [entity (line-seq rdr)]
>                              (write sink entity)))))
>   (stop [this]
>     (future-cancel worker)
>     (assoc this :worker nil)))
>
> (defn -main []
>   (-> (component/system-map :config (read-config)
>                             :events (map->EventStream {:endpoint endpoint
> })
>                             :mq-client (map->MessageQueue {})
>                             :shoveler (map->Shoveler {}))
>       (component/using {:events {:http-config :config}
>                         :mq-client {:config :config}
>                         :shoveler {:source :events
>                                    :sink :mq-client}})
>       component/start))
>
> 3. Rough sketch of desired *production* behavior, not using Component:
> (defn -main []
>   (let [mq-conn (connect-to-queue mq-config)]
>     (while true  ; ideally, the app is *always* ready to receive incoming
> events & put them into the queue
>       (try
>         (let [{event-stream :body} (loop [conn-timeout 1000]
>                                      (try
>                                        (http/get endpoint
>                                                  {:as :stream
>                                                   :conn-timeout conn-
> timeout
>                                                   :socket-timeout 30000})
>                                        (catch java.net.
> SocketTimeoutException e
>                                          (if (> conn-timeout 32000)  ; an
> upper limit. 32 seconds was arbitrarily chosen
>                                            (throw (
> SomeAppropriateException. "Service unavailable. Human attention needed." e
> ))
>                                            (recur (* 2 conn-timeout))  ;
> I know that you can't actually `recur` inside of a `catch`, but I think
> it's the clearest way to present this pseudocode
>                                            ))))]
>           (with-open [rdr (java.io/reader event-stream)]
>             (doseq [entity (line-seq rdr)]
>               (write mq-conn entity))))
>         (catch java.net.SocketTimeoutException e
>           (log/warn e "Didn't receive any data for thirty seconds.
> Reconnecting."))
>         (catch java.net.SocketException e
>           (log/warn e "Server closed the connection. Reconnecting."))
>         ;; Any other exceptions *will* escape the retry loop
>         ))))
>
> --
> You received this message because you are subscribed to the Google
> Groups "Clojure" group.
> To post to this group, send email to clojure@googlegroups.com
> Note that posts from new members are moderated - please be patient with
> your first post.
> To unsubscribe from this group, send email to
> clojure+unsubscr...@googlegroups.com
> For more options, visit this group at
> http://groups.google.com/group/clojure?hl=en
> ---
> You received this message because you are subscribed to the Google Groups
> "Clojure" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to clojure+unsubscr...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to