troy ding created STORM-588:
-------------------------------
Summary: Executor-Level Rebalance Mechanism
Key: STORM-588
URL: https://issues.apache.org/jira/browse/STORM-588
Project: Apache Storm
Issue Type: Improvement
Affects Versions: 0.10.0, 0.9.3-rc2
Reporter: troy ding
Assignee: troy ding
I. The motivation
The current rebalance mechanism is implemented on the worker level. When
rebalance operation is triggered (e.g. by adding/removing a worker), storm
kills all the workers with different assignment. It means the rebalance
operation has to kill certain running workers and launches them according to
the new assignment. The advantage of the mechanism is the simplicity of the
implementation, but possibly incurs _huge_ overhead. Actually, the restarting
latency is usually more than one second, making the system almost impossible to
recover under high incoming data stream rate. No system administrator dares to
call rebalance, especially when the system is overloaded! To bring back the
real benefits of rebalancing operation, we believe it is important to address
the following problems:
*1. Resource wastage and additional initialization cost*: In most cases, the
changes on worker’s assignment (if not killed) only affect a small fraction of
running executors on it. Only part of them needs to be migrated or created,
while the remaining can keep running on the same worker. The current
implementation, however, forcefully restarts all the executors, and calls
unnecessary initializations (i.e. call Bolt.prepare() and Spout.prepare()) to
most of the running tasks. It not only wastes the computation resources of
unaffected executors, but also amplifies the initialization costs under certain
condition, e.g. index load in the bolt.
*2. Restarting workers causes avoidable in-memory data loss*: Currently, a
supervisor uses “kill -9” command to kill its correspondent worker.
Consequently, all the tasks on this worker have no chance to save the task
data. The running states of the workers, including important information when
resuming its duty, are simply lost, potentially causing unnecessary
recomputation on the states.
*3. JVM restart cost, long duration and lost of HotSpot optimizations*:
Restarting a JVM involves a long initialization procedure, and loses all the
runtime optimizations available for the application byte-code. As far as we
know, the HotSpot JVM is capable of detecting the performance-critical sections
in the code and dynamically translates the Java byte codes of these hot spots
into native machine code. In particular, tasks that are CPU-bound can greatly
benefit from this feature. If we directly kill the worker, all the advantages
of these features are lost.
II. Proposed solutions
1. At the supervisor side:
The current supervisor implementation periodically calls the “sync-processes”
function to check whether a live worker should be killed: (1) the mapping
relationship between the worker and the topology has changed (e.g. this worker
is re-assigned to another topology or the serving topology is killed); (2) the
worker’s assignment has updated (e.g. the parallelism of some bolts
increases/decreases).
In order to reuse the worker’s JVM instance as much as possible, we propose
that we do not kill the workers mentioned in condition (2), but only kill those
that do not belong to the topology anymore (condition (1)).
2. At the worker side:
Because of the reuse of the JVM instance, workers needs to periodically
synchronize its assigned executors. To achieve this, a new thread which is
similar to the existing “refresh-connections” is launched, to kill the
non-existing executors, and to start newly assigned ones. Note that, in
practice, the “refresh-connections“ threads already retrieves the assignment
information from the ZK, and this information can be shared with this new
thread, which reduce the load of the ZK.
Due to the change of the binging from the running executors to the worker,
re-routing tuple is also required. To fulfill this prepose, we need to rewrite
the following two functions, “transfer-local-fn” and “transfer-fn” (note the
rewrite is compulsive because these two functions are immutable in the current
implementation).
Another function needs careful modification is
“WorkerTopologyContext.getThisWorkerTasks()”, because the (defn- mk-grouper …
:local-or-shuffle) in “excutor.clj” depends on this function to get required
context information. Therefore, in the case that an end user calls
“WorkerTopologyContext.getThisWorkerTasks()” in the “prepare()”, and stores the
results, if the executor has not restarted, using these results may potentially
leads to inconsistency.
In summary, we propose this new executor-level rebalance mechanism, which tries
to maximize the resource usage and minimize the rebalance cost. This is
essential for the whole system, especially important for the the ultimate
purpose on elasticity features for Storm.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)