Repository: storm Updated Branches: refs/heads/1.x-branch 0f227792c -> 66837b2a3
STORM-2979 WorkerHooks EOFException during run_worker_shutdown_hooks * commits squashed by Jungtaek Lim <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5e305de0 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5e305de0 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5e305de0 Branch: refs/heads/1.x-branch Commit: 5e305de087a23a8527ac18a9b23fd4dbba313632 Parents: 0f22779 Author: michelo <michelo@michelo-desktop> Authored: Tue Apr 3 23:35:38 2018 +0200 Committer: Jungtaek Lim <[email protected]> Committed: Wed Apr 11 21:34:54 2018 +0900 ---------------------------------------------------------------------- .../src/clj/org/apache/storm/daemon/worker.clj | 29 ++++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/5e305de0/storm-core/src/clj/org/apache/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj index d4cfb1e..859a735 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@ -298,6 +298,7 @@ :executors executors :task-ids (->> receive-queue-map keys (map int) sort) :storm-conf storm-conf + :deserialized-worker-hooks (java.util.ArrayList.) :topology topology :system-topology (system-topology! storm-conf topology) :heartbeat-timer (mk-halting-timer "heartbeat-timer") @@ -573,23 +574,29 @@ (reset! latest-log-config new-log-configs) (log-debug "New merged log config is " @latest-log-config)))) -(defn run-worker-start-hooks [worker] +(defn deserialize-worker-hooks [worker] (let [topology (:topology worker) topo-conf (:storm-conf worker) worker-topology-context (worker-context worker) - hooks (.get_worker_hooks topology)] + hooks (.get_worker_hooks topology) + deserialized-worker-hooks (:deserialized-worker-hooks worker)] (dofor [hook hooks] (let [hook-bytes (Utils/toByteArray hook) deser-hook (Utils/javaDeserialize hook-bytes BaseWorkerHook)] - (.start deser-hook topo-conf worker-topology-context))))) + (.add deserialized-worker-hooks deser-hook))))) -(defn run-worker-shutdown-hooks [worker] +(defn run-worker-start-hooks [worker] (let [topology (:topology worker) - hooks (.get_worker_hooks topology)] - (dofor [hook hooks] - (let [hook-bytes (Utils/toByteArray hook) - deser-hook (Utils/javaDeserialize hook-bytes BaseWorkerHook)] - (.shutdown deser-hook))))) + topo-conf (:storm-conf worker) + worker-topology-context (worker-context worker) + deserialized-worker-hooks (:deserialized-worker-hooks worker)] + (dofor [hook deserialized-worker-hooks] + (.start hook topo-conf worker-topology-context)))) + +(defn run-worker-shutdown-hooks [worker] + (let [deserialized-worker-hooks (:deserialized-worker-hooks worker)] + (dofor [hook deserialized-worker-hooks] + (.shutdown hook)))) ;; TODO: should worker even take the storm-id as input? this should be ;; deducable from cluster state (by searching through assignments) @@ -598,6 +605,8 @@ (defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port worker-id] (log-message "Launching worker for " storm-id " on " assignment-id ":" port " with id " worker-id " and conf " conf) + ;; create an empty list to store deserialized hooks + (def deserialized-hooks (java.util.ArrayList.)) (if-not (local-mode? conf) (redirect-stdio-to-slf4j!)) ;; because in local mode, its not a separate @@ -646,6 +655,8 @@ _ (activate-worker-when-all-connections-ready worker) _ (refresh-storm-active worker nil) + + _ (deserialize-worker-hooks worker) _ (run-worker-start-hooks worker)
