Repository: storm
Updated Branches:
  refs/heads/1.1.x-branch ded3d0803 -> 2f150484d


STORM-2979 WorkerHooks EOFException during run_worker_shutdown_hooks

* commits squashed by Jungtaek Lim <kabh...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f50c8b7d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f50c8b7d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f50c8b7d

Branch: refs/heads/1.1.x-branch
Commit: f50c8b7d195eafd73816167f4634ca9b7151c11e
Parents: ded3d08
Author: michelo <michelo@michelo-desktop>
Authored: Tue Apr 3 23:35:38 2018 +0200
Committer: Jungtaek Lim <kabh...@gmail.com>
Committed: Wed Apr 11 21:34:18 2018 +0900

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/worker.clj  | 29 ++++++++++++++------
 1 file changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f50c8b7d/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj 
b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 6626272..03826cd 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -285,6 +285,7 @@
       :executors executors
       :task-ids (->> receive-queue-map keys (map int) sort)
       :storm-conf storm-conf
+      :deserialized-worker-hooks (java.util.ArrayList.)
       :topology topology
       :system-topology (system-topology! storm-conf topology)
       :heartbeat-timer (mk-halting-timer "heartbeat-timer")
@@ -560,23 +561,29 @@
       (reset! latest-log-config new-log-configs)
       (log-debug "New merged log config is " @latest-log-config))))
 
-(defn run-worker-start-hooks [worker]
+(defn deserialize-worker-hooks [worker]
   (let [topology (:topology worker)
         topo-conf (:storm-conf worker)
         worker-topology-context (worker-context worker)
-        hooks (.get_worker_hooks topology)]
+        hooks (.get_worker_hooks topology)
+        deserialized-worker-hooks (:deserialized-worker-hooks worker)]
     (dofor [hook hooks]
       (let [hook-bytes (Utils/toByteArray hook)
             deser-hook (Utils/javaDeserialize hook-bytes BaseWorkerHook)]
-        (.start deser-hook topo-conf worker-topology-context)))))
+        (.add deserialized-worker-hooks deser-hook)))))
 
-(defn run-worker-shutdown-hooks [worker]
+(defn run-worker-start-hooks [worker]
   (let [topology (:topology worker)
-        hooks (.get_worker_hooks topology)]
-    (dofor [hook hooks]
-      (let [hook-bytes (Utils/toByteArray hook)
-            deser-hook (Utils/javaDeserialize hook-bytes BaseWorkerHook)]
-        (.shutdown deser-hook)))))
+        topo-conf (:storm-conf worker)
+        worker-topology-context (worker-context worker)
+        deserialized-worker-hooks (:deserialized-worker-hooks worker)]
+    (dofor [hook deserialized-worker-hooks]
+      (.start hook topo-conf worker-topology-context))))
+
+(defn run-worker-shutdown-hooks [worker]
+    (let [deserialized-worker-hooks (:deserialized-worker-hooks worker)]
+      (dofor [hook deserialized-worker-hooks]
+        (.shutdown hook))))
 
 ;; TODO: should worker even take the storm-id as input? this should be
 ;; deducable from cluster state (by searching through assignments)
@@ -585,6 +592,8 @@
 (defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port 
worker-id]
   (log-message "Launching worker for " storm-id " on " assignment-id ":" port 
" with id " worker-id
                " and conf " conf)
+  ;; create an empty list to store deserialized hooks
+  (def deserialized-hooks (java.util.ArrayList.))
   (if-not (local-mode? conf)
     (redirect-stdio-to-slf4j!))
   ;; because in local mode, its not a separate
@@ -633,6 +642,8 @@
         _ (activate-worker-when-all-connections-ready worker)
 
         _ (refresh-storm-active worker nil)
+        
+        _ (deserialize-worker-hooks worker)
 
         _ (run-worker-start-hooks worker)
 

Reply via email to