Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2319#discussion_r138099459 --- Diff: storm-core/src/clj/org/apache/storm/daemon/nimbus.clj --- @@ -961,14 +1015,25 @@ ;; tasks figure out what tasks to talk to by looking at topology at runtime ;; only log/set when there's been a change to the assignment (doseq [[topology-id assignment] new-assignments - :let [existing-assignment (get existing-assignments topology-id) - topology-details (.getById topologies topology-id)]] + :let [existing-assignment (get existing-assignments topology-id)]] (if (= existing-assignment assignment) (log-debug "Assignment for " topology-id " hasn't changed") (do (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment)) (.set-assignment! storm-cluster-state topology-id assignment) ))) + + ;; grouping assignment by node to see the nodes diff, then notify nodes/supervisors to synchronize its owned assignment + ;; because the number of existing assignments is small for every scheduling round, + ;; we expect to notify supervisors at almost the same time. + (->> new-assignments + (map (fn [[tid new-assignment]] + (let [existing-assignment (get existing-assignments tid)] + (assignment-changed-nodes existing-assignment new-assignment )))) + (apply concat) + (into #{}) + (notify-supervisors-assignments conf new-assignments)) --- End diff -- So then if we can put in a thread pool with a timeout of 5 seconds or so for each push attempt we should be able to get the best of both worlds.
---