[
https://issues.apache.org/jira/browse/STORM-588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rick Kellogg updated STORM-588:
-------------------------------
Component/s: storm-core
> Executor-Level Rebalance Mechanism
> ----------------------------------
>
> Key: STORM-588
> URL: https://issues.apache.org/jira/browse/STORM-588
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-core
> Affects Versions: 0.10.0, 0.9.3-rc2
> Reporter: troy ding
> Assignee: troy ding
>
> I. The motivation
> The current rebalance mechanism is implemented on the worker level. When
> rebalance operation is triggered (e.g. by adding/removing a worker), storm
> kills all the workers with different assignment. It means the rebalance
> operation has to kill certain running workers and launches them according to
> the new assignment. The advantage of the mechanism is the simplicity of the
> implementation, but possibly incurs _huge_ overhead. Actually, the restarting
> latency is usually more than one second, making the system almost impossible
> to recover under high incoming data stream rate. No system administrator
> dares to call rebalance, especially when the system is overloaded! To bring
> back the real benefits of rebalancing operation, we believe it is important
> to address the following problems:
> *1. Resource wastage and additional initialization cost*: In most cases, the
> changes on worker’s assignment (if not killed) only affect a small fraction
> of running executors on it. Only part of them needs to be migrated or
> created, while the remaining can keep running on the same worker. The current
> implementation, however, forcefully restarts all the executors, and calls
> unnecessary initializations (i.e. call Bolt.prepare() and Spout.prepare()) to
> most of the running tasks. It not only wastes the computation resources of
> unaffected executors, but also amplifies the initialization costs under
> certain condition, e.g. index load in the bolt.
> *2. Restarting workers causes avoidable in-memory data loss*: Currently, a
> supervisor uses “kill -9” command to kill its correspondent worker.
> Consequently, all the tasks on this worker have no chance to save the task
> data. The running states of the workers, including important information when
> resuming its duty, are simply lost, potentially causing unnecessary
> recomputation on the states.
> *3. JVM restart cost, long duration and lost of HotSpot optimizations*:
> Restarting a JVM involves a long initialization procedure, and loses all the
> runtime optimizations available for the application byte-code. As far as we
> know, the HotSpot JVM is capable of detecting the performance-critical
> sections in the code and dynamically translates the Java byte codes of these
> hot spots into native machine code. In particular, tasks that are CPU-bound
> can greatly benefit from this feature. If we directly kill the worker, all
> the advantages of these features are lost.
> II. Proposed solutions
> 1. At the supervisor side:
> The current supervisor implementation periodically calls the “sync-processes”
> function to check whether a live worker should be killed: (1) the mapping
> relationship between the worker and the topology has changed (e.g. this
> worker is re-assigned to another topology or the serving topology is killed);
> (2) the worker’s assignment has updated (e.g. the parallelism of some bolts
> increases/decreases).
> In order to reuse the worker’s JVM instance as much as possible, we propose
> that we do not kill the workers mentioned in condition (2), but only kill
> those that do not belong to the topology anymore (condition (1)).
> 2. At the worker side:
> Because of the reuse of the JVM instance, workers needs to periodically
> synchronize its assigned executors. To achieve this, a new thread which is
> similar to the existing “refresh-connections” is launched, to kill the
> non-existing executors, and to start newly assigned ones. Note that, in
> practice, the “refresh-connections“ threads already retrieves the assignment
> information from the ZK, and this information can be shared with this new
> thread, which reduce the load of the ZK.
> Due to the change of the binging from the running executors to the worker,
> re-routing tuple is also required. To fulfill this prepose, we need to
> rewrite the following two functions, “transfer-local-fn” and “transfer-fn”
> (note the rewrite is compulsive because these two functions are immutable in
> the current implementation).
> Another function needs careful modification is
> “WorkerTopologyContext.getThisWorkerTasks()”, because the (defn- mk-grouper …
> :local-or-shuffle) in “excutor.clj” depends on this function to get required
> context information. Therefore, in the case that an end user calls
> “WorkerTopologyContext.getThisWorkerTasks()” in the “prepare()”, and stores
> the results, if the executor has not restarted, using these results may
> potentially leads to inconsistency.
> In summary, we propose this new executor-level rebalance mechanism, which
> tries to maximize the resource usage and minimize the rebalance cost. This is
> essential for the whole system, especially important for the the ultimate
> purpose on elasticity features for Storm.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)