TLDR: how do you use Component when the application logic involves retrying 
failed components?

Background:
I'm writing an app that consumes events from a streaming HTTP connection 
and writes those events to a message queue (see Code Illustration #1). It 
seems like that could be captured easily with three components —an HTTP 
stream, a message queue connection, and a "shoveler" that depends on the 
other two (see Code Illustration #2)— *but* the reconnection requirements 
complicate things…
The HTTP connection may be closed at any time by the server; if that 
happens, the app should persistently attempt to reconnect using an 
exponential back-off pattern. In addition, if the app goes thirty seconds 
without receiving any data, it should close the connection and try to 
reconnect. (see Code Illustration #3) It's not clear to me how to best 
express these "retry" requirements in the component lifecycle. Like, is it 
blasphemous for a component to be calling stop and start on its injected 
dependencies?

Some possible approaches:

   - Throw different kinds of exceptions to indicate what should happen 
   (using namespaced keywords, perhaps?), handled by whoever calls 
   component/start on the system-map.
   - The exception provides the component and system at the time of the 
      exception, enabling a sort of "resume" capability.
      - I'm under the impression that relying on exceptions for control 
      flow is an anti-pattern.
   - Create a sort of custom system implementation, one that goes beyond 
   calling start on its components in dependency order to monitor failures 
   and direct retries "appropriately".
      - "A system is a component which knows how to start and stop other 
      components." (from the README)
      So the fact that we want the shoveler component to be capable of 
      restarting the HTTP component indicates that the shoveler should actually 
      be considered a system. (right?)
         - If the HTTP stream is *injected* into the shoveler as a 
         dependency, how is it possible for the shoveler to stop the HTTP 
         stream and then start it again *with* any dependencies the stream 
         may have?
      - Ensure that every component/Lifecycle method implementation is 
   idempotent, so that I can get good-enough "restart" semantics by just 
   calling start-system again.
      - I know that idempotence is generally a Good Thing anyway, but using 
      start-system as a panacea strikes me as crude.
   

Code Illustrations:

1. Rough sketch of app without timeout/retry logic or component:
(defn -main []
  (let [mq-conn (connect-to-queue mq-config)
        {event-stream :body} (http/get endpoint {:as :stream})]
    (with-open [rdr (java.io/reader event-stream)]
      (doseq [entity (line-seq rdr)]
        (write mq-conn entity)))))

2. Rough sketch of app with component but still without timeout/retry logic:
(defrecord EventStream [endpoint
                        http-config
                        stream]
  component/Lifecycle
  (start [this]
    (let [response (http/get endpoint (merge {:as :stream} http-config))]
      (assoc this :http-response response, :stream (:body response)))
  (stop [this]
    (.close stream)
    (-> this (dissoc :http-response) (assoc :stream nil))))

(defrecord MessageQueue [config
                         connection]
  component/Lifecycle
  (start [this]
    (assoc this :connection (connect-to-queue config)))
  (stop [this]
    (.close connection)
    (assoc this :connection nil)))

(defrecord Shoveler [source sink
                     worker]
  component/Lifecycle
  (start [this]
    ;; To avoid blocking indefinitely, we put the processing in a future.
    (assoc this :worker (future
                         (with-open [rdr (java.io/reader (:stream source)]
                           (doseq [entity (line-seq rdr)]
                             (write sink entity)))))
  (stop [this]
    (future-cancel worker)
    (assoc this :worker nil)))

(defn -main []
  (-> (component/system-map :config (read-config)
                            :events (map->EventStream {:endpoint endpoint})
                            :mq-client (map->MessageQueue {})
                            :shoveler (map->Shoveler {}))
      (component/using {:events {:http-config :config}
                        :mq-client {:config :config}
                        :shoveler {:source :events
                                   :sink :mq-client}})
      component/start))

3. Rough sketch of desired *production* behavior, not using Component:
(defn -main []
  (let [mq-conn (connect-to-queue mq-config)]
    (while true  ; ideally, the app is *always* ready to receive incoming 
events & put them into the queue
      (try
        (let [{event-stream :body} (loop [conn-timeout 1000]
                                     (try
                                       (http/get endpoint
                                                 {:as :stream
                                                  :conn-timeout conn-timeout
                                                  :socket-timeout 30000})
                                       (catch java.net.
SocketTimeoutException e
                                         (if (> conn-timeout 32000)  ; an 
upper limit. 32 seconds was arbitrarily chosen
                                           (throw (SomeAppropriateException. 
"Service 
unavailable. Human attention needed." e))
                                           (recur (* 2 conn-timeout))  ; I 
know that you can't actually `recur` inside of a `catch`, but I think it's 
the clearest way to present this pseudocode
                                           ))))]
          (with-open [rdr (java.io/reader event-stream)]
            (doseq [entity (line-seq rdr)]
              (write mq-conn entity))))
        (catch java.net.SocketTimeoutException e
          (log/warn e "Didn't receive any data for thirty seconds. 
Reconnecting."))
        (catch java.net.SocketException e
          (log/warn e "Server closed the connection. Reconnecting."))
        ;; Any other exceptions *will* escape the retry loop
        ))))

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to