James Xu created STORM-155:
------------------------------

             Summary: Storm rebalancing code causes multiple topologies 
assigned to a single port
                 Key: STORM-155
                 URL: https://issues.apache.org/jira/browse/STORM-155
             Project: Apache Storm (Incubating)
          Issue Type: Bug
            Reporter: James Xu


https://github.com/nathanmarz/storm/issues/551

We're seeing an issue when rebalancing topologies on the clusters causes 
workers being assigned to multiple topologies which causes supervisors to fail. 
This can be easily reproduced locally by starting a single supervisor with 4 
workers and nimbus, running several topologies and rebalancing all of them to 
use 1 worker.

I tracked the issue to the mk-assignments function in nimbus.clj. In this 
function, the "existing-assignments" binding is assigned a list of all 
topologies with assignments except the one being rebalanced. The comment 
implies this is done to treat all the workers of the topology being rebalanced 
as unused, and that's what actually happens. However, the lack of the topology 
being rebalanced in the "existing-assignments" list causes this topology being 
ignore completely by scheduler and other code, so as the result all workers 
assigned to that topology will be taken over by other topologies, but
no changes to the topology being rebalanced will be made, effective making all 
it's workers
assigned to 2 topologies.

I think that was not the intended behavior. I made a small change to treat all 
the workers of the topology being rebalanced dead instead, causing them to be 
reassigned fairly between all the topologies. This seems to work well and 
reliably for us.

Let me know what do you think.

--- a/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/src/clj/backtype/storm/daemon/nimbus.clj
@@ -437,8 +437,11 @@
   (into {} (for [[tid assignment] existing-assignments
                  :let [topology-details (.getById topologies tid)
                        all-executors (topology->executors tid)
+                       ;; for the topology which wants rebalance (specified by 
the scratch-topology-id)
+                       ;; we consider all its execturors to be dead so they 
will be treated
+                       ;; as free slots in the scheduler code.
                        alive-executors (if (and scratch-topology-id (= 
scratch-topology-id tid))
-                                         all-executors
+                                         (set nil)
                                          (set (alive-executors nimbus 
topology-details all-executors assignment)))]]
              {tid alive-executors})))

@@ -638,11 +641,7 @@
         ;; read all the assignments
         assigned-topology-ids (.assignments storm-cluster-state nil)
         existing-assignments (into {} (for [tid assigned-topology-ids]
-                                        ;; for the topology which wants 
rebalance (specified by the scratch-topology-id)
-                                        ;; we exclude its assignment, meaning 
that all the slots occupied by its assignment
-                                        ;; will be treated as free slot in the 
scheduler code.
-                                        (when (or (nil? scratch-topology-id) 
(not= tid scratch-topology-id))
-                                          {tid (.assignment-info 
storm-cluster-state tid nil)})))
+                                        {tid (.assignment-info 
storm-cluster-state tid nil)}))
         ;; make the new assignments for topologies
         topology->executor->node+port 
(compute-new-topology->executor->node+port
                                        nimbus

----------
xumingming: The following code needs to be synchonized(mk-assignments):

(defn do-rebalance [nimbus storm-id status]
  (.update-storm! (:storm-cluster-state nimbus)
                  storm-id
                  (assoc-non-nil
                    {:component->executors (:executor-overrides status)}
                    :num-workers
                    (:num-workers status)))
  (mk-assignments nimbus :scratch-topology-id storm-id))
otherwise it will cause race condition here: 
https://github.com/nathanmarz/storm/blob/master/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java#L264

then one port will be assigned to multiple topologies.

----------
stass: That's not the issue here. I initially thought it was a race condition 
as well, but my problem with rebalancing was in the sequential part of the 
algorithm (as described in analysis). Needless to say, it fixed the issue for 
us and I have not seen any multiple assignments in months when running with the 
patch I submitted.

----------
d2r: We are also hitting this issue and would welcome a fix.

----------
nathanmarz: The problem with the proposed patch is that it doesn't treat the 
slots of the topology that's rebalancing as free. It will certainly consider 
its executors as dead, but it won't make use of its slots during re-scheduling.

----------
stass: Hi, Nathan, thanks for comment.

If my reading of the code is right, the workers of the topology being 
rebalanced will be treated as free since they will be marked as dead (that's 
the purpose of the top hunk of the diff), and the scheduling code only looks at 
free workers when assigning workers to topologies. That is also what I observed 
in practice with this patch running.

Am I missing something?

----------
vinceyang: @nathanmarz , We are also hitting this issue , I read the nimbus 
code,it seems when do rebalance , in mk-assignment function,the rebalancing 
topolgy's assignment info has out of date , but supervisor not kown this 
information. when other topolgy's assignment has the same port with the out of 
date assignment the problem occur. if we remove the out of date assignment in 
ZK this problem will not occur. if my Idea is OK,I will work on it to fix this 
issue.

----------
revans2: We have hit this issue too, and so I have been looking into it. It 
seems that it can happen in two different situations.

First a topology is not assigned anything after it previously had slots 
assigned to it.

This happens most commonly when re-balancing because the scheduler is not aware 
the rebalanced topology had anything assigned to it previously, but I have been 
able to reproduce this with other hacked up schedulers.

When this happens the supervisor in question will crash continuously until one 
of the topologies is killed.

The fix seems to be that we should include assigned-topology-ids in 
topology->executor->node+port when missing but with the topology pointing to 
nil.

Second the supervisor uses partially written scheduling data from ZK.

(.set-assignment! storm-cluster-state topology-id assignment) is atomic for a 
single topology, but not for multiple topologies. This means that the 
supervisor can read data from ZK that has had some topologies updated, but not 
all of them.

When this happens the supervisor will crash and then come back up and recover 
because the rest of the scheduling data was written to ZK.

The fix for this seems to be that we need to "lock" zookeeper with a watch file 
during the update. The supervisors would not read the data until nimbus is done 
updating. I don't think this is as critical to fix because the supervisor 
recovers fairly quickly.

Does my analysis seem correct? I don't understand all of the code perfectly, so 
I want to be sure I am not missing something.




--
This message was sent by Atlassian JIRA
(v6.1.4#6159)

Reply via email to