troy ding created STORM-588:
-------------------------------

             Summary: Executor-Level Rebalance Mechanism
                 Key: STORM-588
                 URL: https://issues.apache.org/jira/browse/STORM-588
             Project: Apache Storm
          Issue Type: Improvement
    Affects Versions: 0.10.0, 0.9.3-rc2
            Reporter: troy ding
            Assignee: troy ding


I. The motivation

The current rebalance mechanism is implemented on the worker level. When 
rebalance operation is triggered (e.g. by adding/removing a worker), storm 
kills all the workers with different assignment. It means the rebalance 
operation has to kill certain running workers and launches them according to 
the new assignment. The advantage of the mechanism is the simplicity of the 
implementation, but possibly incurs _huge_ overhead. Actually, the restarting 
latency is usually more than one second, making the system almost impossible to 
recover under high incoming data stream rate. No system administrator dares to 
call rebalance, especially when the system is overloaded! To bring back the 
real benefits of rebalancing operation, we believe it is important to address 
the following problems:

*1. Resource wastage and additional initialization cost*: In most cases, the 
changes on worker’s assignment (if not killed) only affect a small fraction of 
running executors on it. Only part of them needs to be migrated or created, 
while the remaining can keep running on the same worker. The current 
implementation, however, forcefully restarts all the executors, and calls 
unnecessary initializations (i.e. call Bolt.prepare() and Spout.prepare()) to 
most of the running tasks. It not only wastes the computation resources of 
unaffected executors, but also amplifies the initialization costs under certain 
condition, e.g. index load in the bolt.

*2. Restarting workers causes avoidable in-memory data loss*: Currently, a 
supervisor uses “kill -9” command to kill its correspondent worker. 
Consequently, all the tasks on this worker have no chance to save the task 
data. The running states of the workers, including important information when 
resuming its duty, are simply lost, potentially causing unnecessary 
recomputation on the states.

*3. JVM restart cost, long duration and lost of HotSpot optimizations*: 
Restarting a JVM involves a long initialization procedure, and loses all the 
runtime optimizations available for the application byte-code. As far as we 
know, the HotSpot JVM is capable of detecting the performance-critical sections 
in the code and dynamically translates the Java byte codes of these hot spots 
into native machine code. In particular, tasks that are CPU-bound can greatly 
benefit from this feature. If we directly kill the worker, all the advantages 
of these features are lost.

II. Proposed solutions

1. At the supervisor side:
The current supervisor implementation periodically calls the “sync-processes” 
function to check whether a live worker should be killed: (1) the mapping 
relationship between the worker and the topology has changed (e.g. this worker 
is re-assigned to another topology or the serving topology is killed); (2) the 
worker’s assignment has updated (e.g. the parallelism of some bolts 
increases/decreases). 

In order to reuse the worker’s JVM instance as much as possible, we propose 
that we do not kill the workers mentioned in condition (2), but only kill those 
that do not belong to the topology anymore (condition (1)).

2. At the worker side: 
Because of the reuse of the JVM instance, workers needs to periodically 
synchronize its assigned executors. To achieve this, a new thread which is 
similar to the existing “refresh-connections” is launched, to kill the 
non-existing executors, and to start newly assigned ones. Note that, in 
practice, the “refresh-connections“ threads already retrieves the assignment 
information from the ZK, and this information can be shared with this new 
thread, which reduce the load of the ZK.

Due to the change of the binging from the running executors to the worker, 
re-routing tuple is also required. To fulfill this prepose, we need to rewrite 
the following two functions, “transfer-local-fn” and “transfer-fn” (note the 
rewrite is compulsive because these two functions are immutable in the current 
implementation). 

Another function needs careful modification is 
“WorkerTopologyContext.getThisWorkerTasks()”, because the (defn- mk-grouper … 
:local-or-shuffle) in “excutor.clj” depends on this function to get required 
context information. Therefore, in the case that an end user calls 
“WorkerTopologyContext.getThisWorkerTasks()” in the “prepare()”, and stores the 
results, if the executor has not restarted, using these results may potentially 
leads to inconsistency.

In summary, we propose this new executor-level rebalance mechanism, which tries 
to maximize the resource usage and minimize the rebalance cost. This is 
essential for the whole system, especially important for the the ultimate 
purpose on elasticity features for Storm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to