[ https://issues.apache.org/jira/browse/HELIX-655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16005446#comment-16005446 ]
ASF GitHub Bot commented on HELIX-655: -------------------------------------- Github user kongweihan commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r115849569 --- Diff: helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java --- @@ -260,13 +260,72 @@ public Message getPendingState(String resourceName, Partition partition, String return partitionSet; } + /** + * Get the partitions count for each participant with the pending state and given resource state model + * @param resourceStateModel specified resource state model to look up + * @param state specified pending resource state to look up + * @return set of participants to partitions mapping + */ + public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel, String state) { + Map<String, Integer> pendingPartitionCount = new HashMap<String, Integer>(); + for (String resource : _pendingStateMap.keySet()) { + String stateModel = _resourceStateModelMap.get(resource); + if (stateModel != null && stateModel.equals(resourceStateModel) + || stateModel == null && resourceStateModel == null) { + for (Partition partition : _pendingStateMap.get(resource).keySet()) { + Map<String, Message> partitionMessage = _pendingStateMap.get(resource).get(partition); + for (Map.Entry<String, Message> participantMap : partitionMessage.entrySet()) { + String participant = participantMap.getKey(); + if (!pendingPartitionCount.containsKey(participant)) { + pendingPartitionCount.put(participant, 0); + } + String toState = participantMap.getValue().getToState(); + if (toState != null && toState.equals(state) || toState == null && state == null) { + pendingPartitionCount.put(participant, pendingPartitionCount.get(participant) + 1); + } + } + } + } + } + return pendingPartitionCount; + } + + /** + * Get the partitions count for each participant in the current state and with given resource state model + * @param resourceStateModel specified resource state model to look up + * @param state specified current resource state to look up + * @return set of participants to partitions mapping + */ + public Map<String, Integer> getPartitionCountWithCurrentState(String resourceStateModel, String state) { --- End diff -- This is similar with the above method, would it be better to combine them together? I see that `_pendingStateMap` contains Messages instead of Strings, making it a bit hard to abstract. But look, its name is "_pendingStateMap", shouldn't it contain the pending state, instead of the message? > Helix per-participant concurrent task throttling > ------------------------------------------------ > > Key: HELIX-655 > URL: https://issues.apache.org/jira/browse/HELIX-655 > Project: Apache Helix > Issue Type: New Feature > Components: helix-core > Affects Versions: 0.6.x > Reporter: Jiajun Wang > Assignee: Junkai Xue > > h1. Overview > Currently, all runnable jobs/tasks in Helix are equally treated. They are all > scheduled according to the rebalancer algorithm. Means, their assignment > might be different, but they will all be in RUNNING state. > This may cause an issue if there are too many concurrently runnable jobs. > When Helix controller starts all these jobs, the instances may be overload as > they are assigning resources and executing all the tasks. As a result, the > jobs won't be able to finish in a reasonable time window. > The issue is even more critical to long run jobs. According to our meeting > with Gobblin team, when a job is scheduled, they allocate resource for the > job. So in the situation described above, more and more resources will be > reserved for the pending jobs. The cluster will soon be exhausted. > For solving the problem, an application needs to schedule jobs in a > relatively low frequency (what Gobblin is doing now). This may cause low > utilization. > A better way to fix this issue, at framework level, is throttling jobs/tasks > that are running concurrently, and allowing setting priority for different > jobs to control total execute time. > So given same amount of jobs, the cluster is in a better condition. As a > result, jobs running in that cluster have a more controllable execute time. > Existing related control mechanisms are: > * ConcurrentTasksPerInstance for each job > * ParallelJobs for each workflow > * Threadpool limitation on the participant if user customizes > TaskStateModelFactory. > But none of them can directly help when concurrent workflows or jobs number > is large. If an application keeps scheduling jobs/jobQueues, Helix will start > any runnable jobs without considering the workload on the participants. > The application may be able to carefully configures these items to achieve > the goal. But they won't be able to easily find the sweet spot. Especially > the cluster might be changing (scale out etc.). > h2. Problem summary > # All runnable tasks will start executing, which may overload the participant. > # Application needs a mechanism to prioritize important jobs (or workflows). > Otherwise, important tasks may be blocked by other less important ones. And > allocated resource is wasted. > h2. Feature proposed > Based on our discussing, we proposed 2 features that can help to resolve the > issue. > # Running task throttling on each participant. This is for avoiding overload. > # Job priority control that ensures high priority jobs are scheduled earlier. > In addition, application can leverage workflow/job monitor items as feedback > from Helix to adjust their stretgy. -- This message was sent by Atlassian JIRA (v6.3.15#6346)