[HELIX-617] Job IdealState is generated even the job is not running and not removed when it is completed.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1798e793 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1798e793 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1798e793 Branch: refs/heads/helix-0.6.x Commit: 1798e793522157b1b479a66c8a9ec9453d698b8f Parents: 7bbb20b Author: Lei Xia <[email protected]> Authored: Wed Dec 9 14:02:45 2015 -0800 Committer: Lei Xia <[email protected]> Committed: Wed Dec 9 16:39:37 2015 -0800 ---------------------------------------------------------------------- .../java/org/apache/helix/model/IdealState.java | 8 +- .../FixedTargetTaskAssignmentCalculator.java | 163 +++ .../helix/task/FixedTargetTaskRebalancer.java | 163 --- .../task/GenericTaskAssignmentCalculator.java | 273 +++++ .../helix/task/GenericTaskRebalancer.java | 273 ----- .../org/apache/helix/task/JobRebalancer.java | 650 +++++++++++ .../helix/task/TaskAssignmentCalculator.java | 45 + .../java/org/apache/helix/task/TaskDriver.java | 205 ++-- .../org/apache/helix/task/TaskRebalancer.java | 1045 +++--------------- .../java/org/apache/helix/task/TaskUtil.java | 216 ++-- .../java/org/apache/helix/task/Workflow.java | 6 +- .../apache/helix/task/WorkflowRebalancer.java | 412 +++++++ .../integration/task/TestRecurringJobQueue.java | 78 +- .../integration/task/TestTaskRebalancer.java | 2 +- 14 files changed, 1971 insertions(+), 1568 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/model/IdealState.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java index 696de7a..e7f6096 100644 --- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java +++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java @@ -31,9 +31,9 @@ import org.apache.helix.HelixConstants; import org.apache.helix.HelixProperty; import org.apache.helix.ZNRecord; import org.apache.helix.controller.rebalancer.Rebalancer; -import org.apache.helix.task.FixedTargetTaskRebalancer; -import org.apache.helix.task.GenericTaskRebalancer; +import org.apache.helix.task.JobRebalancer; import org.apache.helix.task.TaskRebalancer; +import org.apache.helix.task.WorkflowRebalancer; import org.apache.log4j.Logger; /** @@ -524,8 +524,8 @@ public class IdealState extends HelixProperty { default: String rebalancerName = getRebalancerClassName(); if (rebalancerName != null) { - if (rebalancerName.equals(FixedTargetTaskRebalancer.class.getName()) - || rebalancerName.equals(GenericTaskRebalancer.class.getName())) { + if (rebalancerName.equals(JobRebalancer.class.getName()) + || rebalancerName.equals(WorkflowRebalancer.class.getName())) { property = RebalanceMode.TASK; } else { property = RebalanceMode.USER_DEFINED; http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java new file mode 100644 index 0000000..8760524 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java @@ -0,0 +1,163 @@ +package org.apache.helix.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.Message; +import org.apache.helix.model.Partition; +import org.apache.helix.model.ResourceAssignment; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +/** + * A TaskAssignmentCalculator for when a task group must be assigned according to partitions/states on a target + * resource. Here, tasks are co-located according to where a resource's partitions are, as well as + * (if desired) only where those partitions are in a given state. + */ +public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculator { + + @Override + public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, + WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) { + return getAllTaskPartitions(getTgtIdealState(jobCfg, cache), jobCfg, jobCtx); + } + + @Override + public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput, + ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg, + JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, + Set<Integer> partitionSet, ClusterDataCache cache) { + IdealState tgtIs = getTgtIdealState(jobCfg, cache); + if (tgtIs == null) { + return Collections.emptyMap(); + } + Set<String> tgtStates = jobCfg.getTargetPartitionStates(); + return getTgtPartitionAssignment(currStateOutput, instances, tgtIs, tgtStates, partitionSet, + jobContext); + } + + /** + * Gets the ideal state of the target resource of this job + * @param jobCfg job config containing target resource id + * @param cache snapshot of the cluster containing the task and target resource + * @return target resource ideal state, or null + */ + private static IdealState getTgtIdealState(JobConfig jobCfg, ClusterDataCache cache) { + String tgtResourceId = jobCfg.getTargetResource(); + return cache.getIdealState(tgtResourceId); + } + + /** + * Returns the set of all partition ids for a job. + * <p/> + * If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we + * use the list of all partition ids from the target resource. + */ + private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, JobConfig jobCfg, + JobContext taskCtx) { + if (tgtResourceIs == null) { + return null; + } + Map<String, List<Integer>> currentTargets = taskCtx.getPartitionsByTarget(); + SortedSet<String> targetPartitions = Sets.newTreeSet(); + if (jobCfg.getTargetPartitions() != null) { + targetPartitions.addAll(jobCfg.getTargetPartitions()); + } else { + targetPartitions.addAll(tgtResourceIs.getPartitionSet()); + } + + Set<Integer> taskPartitions = Sets.newTreeSet(); + for (String pName : targetPartitions) { + taskPartitions.addAll(getPartitionsForTargetPartition(pName, currentTargets, taskCtx)); + } + return taskPartitions; + } + + private static List<Integer> getPartitionsForTargetPartition(String targetPartition, + Map<String, List<Integer>> currentTargets, JobContext jobCtx) { + if (!currentTargets.containsKey(targetPartition)) { + int nextId = jobCtx.getPartitionSet().size(); + jobCtx.setPartitionTarget(nextId, targetPartition); + return Lists.newArrayList(nextId); + } else { + return currentTargets.get(targetPartition); + } + } + + /** + * Get partition assignments for the target resource, but only for the partitions of interest. + * @param currStateOutput The current state of the instances in the cluster. + * @param instances The instances. + * @param tgtIs The ideal state of the target resource. + * @param tgtStates Only partitions in this set of states will be considered. If null, partitions + * do not need to + * be in any specific state to be considered. + * @param includeSet The set of partitions to consider. + * @return A map of instance vs set of partition ids assigned to that instance. + */ + private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment( + CurrentStateOutput currStateOutput, Iterable<String> instances, IdealState tgtIs, + Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx) { + Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>(); + for (String instance : instances) { + result.put(instance, new TreeSet<Integer>()); + } + + Map<String, List<Integer>> partitionsByTarget = jobCtx.getPartitionsByTarget(); + for (String pName : tgtIs.getPartitionSet()) { + List<Integer> partitions = partitionsByTarget.get(pName); + if (partitions == null || partitions.size() < 1) { + continue; + } + int pId = partitions.get(0); + if (includeSet.contains(pId)) { + for (String instance : instances) { + Message pendingMessage = + currStateOutput.getPendingState(tgtIs.getResourceName(), new Partition(pName), + instance); + if (pendingMessage != null) { + continue; + } + String s = + currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName), + instance); + String state = (s == null ? null : s.toString()); + if (tgtStates == null || tgtStates.contains(state)) { + result.get(instance).add(pId); + } + } + } + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java deleted file mode 100644 index 4c013c0..0000000 --- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java +++ /dev/null @@ -1,163 +0,0 @@ -package org.apache.helix.task; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.apache.helix.controller.stages.ClusterDataCache; -import org.apache.helix.controller.stages.CurrentStateOutput; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.Message; -import org.apache.helix.model.Partition; -import org.apache.helix.model.ResourceAssignment; - -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -/** - * A rebalancer for when a task group must be assigned according to partitions/states on a target - * resource. Here, tasks are colocated according to where a resource's partitions are, as well as - * (if desired) only where those partitions are in a given state. - */ -public class FixedTargetTaskRebalancer extends TaskRebalancer { - - @Override - public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, - WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) { - return getAllTaskPartitions(getTgtIdealState(jobCfg, cache), jobCfg, jobCtx); - } - - @Override - public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput, - ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg, - JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, - Set<Integer> partitionSet, ClusterDataCache cache) { - IdealState tgtIs = getTgtIdealState(jobCfg, cache); - if (tgtIs == null) { - return Collections.emptyMap(); - } - Set<String> tgtStates = jobCfg.getTargetPartitionStates(); - return getTgtPartitionAssignment(currStateOutput, instances, tgtIs, tgtStates, partitionSet, - jobContext); - } - - /** - * Gets the ideal state of the target resource of this job - * @param jobCfg job config containing target resource id - * @param cluster snapshot of the cluster containing the task and target resource - * @return target resource ideal state, or null - */ - private static IdealState getTgtIdealState(JobConfig jobCfg, ClusterDataCache cache) { - String tgtResourceId = jobCfg.getTargetResource(); - return cache.getIdealState(tgtResourceId); - } - - /** - * Returns the set of all partition ids for a job. - * <p/> - * If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we - * use the list of all partition ids from the target resource. - */ - private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, JobConfig jobCfg, - JobContext taskCtx) { - if (tgtResourceIs == null) { - return null; - } - Map<String, List<Integer>> currentTargets = taskCtx.getPartitionsByTarget(); - SortedSet<String> targetPartitions = Sets.newTreeSet(); - if (jobCfg.getTargetPartitions() != null) { - targetPartitions.addAll(jobCfg.getTargetPartitions()); - } else { - targetPartitions.addAll(tgtResourceIs.getPartitionSet()); - } - - Set<Integer> taskPartitions = Sets.newTreeSet(); - for (String pName : targetPartitions) { - taskPartitions.addAll(getPartitionsForTargetPartition(pName, currentTargets, taskCtx)); - } - return taskPartitions; - } - - private static List<Integer> getPartitionsForTargetPartition(String targetPartition, - Map<String, List<Integer>> currentTargets, JobContext jobCtx) { - if (!currentTargets.containsKey(targetPartition)) { - int nextId = jobCtx.getPartitionSet().size(); - jobCtx.setPartitionTarget(nextId, targetPartition); - return Lists.newArrayList(nextId); - } else { - return currentTargets.get(targetPartition); - } - } - - /** - * Get partition assignments for the target resource, but only for the partitions of interest. - * @param currStateOutput The current state of the instances in the cluster. - * @param instances The instances. - * @param tgtIs The ideal state of the target resource. - * @param tgtStates Only partitions in this set of states will be considered. If null, partitions - * do not need to - * be in any specific state to be considered. - * @param includeSet The set of partitions to consider. - * @return A map of instance vs set of partition ids assigned to that instance. - */ - private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment( - CurrentStateOutput currStateOutput, Iterable<String> instances, IdealState tgtIs, - Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx) { - Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>(); - for (String instance : instances) { - result.put(instance, new TreeSet<Integer>()); - } - - Map<String, List<Integer>> partitionsByTarget = jobCtx.getPartitionsByTarget(); - for (String pName : tgtIs.getPartitionSet()) { - List<Integer> partitions = partitionsByTarget.get(pName); - if (partitions == null || partitions.size() < 1) { - continue; - } - int pId = partitions.get(0); - if (includeSet.contains(pId)) { - for (String instance : instances) { - Message pendingMessage = - currStateOutput.getPendingState(tgtIs.getResourceName(), new Partition(pName), - instance); - if (pendingMessage != null) { - continue; - } - String s = - currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName), - instance); - String state = (s == null ? null : s.toString()); - if (tgtStates == null || tgtStates.contains(state)) { - result.get(instance).add(pId); - } - } - } - } - - return result; - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java new file mode 100644 index 0000000..e8d5f5d --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java @@ -0,0 +1,273 @@ +package org.apache.helix.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.helix.ZNRecord; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.controller.strategy.AutoRebalanceStrategy; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.Partition; +import org.apache.helix.model.ResourceAssignment; + +import com.google.common.base.Function; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +/** + * This class does an assignment based on an automatic rebalancing strategy, rather than requiring + * assignment to target partitions and states of another resource + */ +public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator { + /** Reassignment policy for this algorithm */ + private RetryPolicy _retryPolicy = new DefaultRetryReassigner(); + + @Override + public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, + WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) { + Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap(); + Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap(); + for (TaskConfig taskCfg : taskMap.values()) { + String taskId = taskCfg.getId(); + int nextPartition = jobCtx.getPartitionSet().size(); + if (!taskIdMap.containsKey(taskId)) { + jobCtx.setTaskIdForPartition(nextPartition, taskId); + } + } + return jobCtx.getPartitionSet(); + } + + @Override + public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput, + ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg, + final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, + Set<Integer> partitionSet, ClusterDataCache cache) { + // Gather input to the full auto rebalancing algorithm + LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>(); + states.put("ONLINE", 1); + + // Only map partitions whose assignment we care about + final Set<TaskPartitionState> honoredStates = + Sets.newHashSet(TaskPartitionState.INIT, TaskPartitionState.RUNNING, + TaskPartitionState.STOPPED); + Set<Integer> filteredPartitionSet = Sets.newHashSet(); + for (Integer p : partitionSet) { + TaskPartitionState state = (jobContext == null) ? null : jobContext.getPartitionState(p); + if (state == null || honoredStates.contains(state)) { + filteredPartitionSet.add(p); + } + } + + // Transform from partition id to fully qualified partition name + List<Integer> partitionNums = Lists.newArrayList(partitionSet); + Collections.sort(partitionNums); + final String resourceId = prevAssignment.getResourceName(); + List<String> partitions = + new ArrayList<String>(Lists.transform(partitionNums, new Function<Integer, String>() { + @Override + public String apply(Integer partitionNum) { + return resourceId + "_" + partitionNum; + } + })); + + // Compute the current assignment + Map<String, Map<String, String>> currentMapping = Maps.newHashMap(); + for (Partition partition : currStateOutput.getCurrentStateMappedPartitions(resourceId)) { + if (!filteredPartitionSet.contains(TaskUtil.getPartitionId(partition.getPartitionName()))) { + // not computing old partitions + continue; + } + Map<String, String> allPreviousDecisionMap = Maps.newHashMap(); + if (prevAssignment != null) { + allPreviousDecisionMap.putAll(prevAssignment.getReplicaMap(partition)); + } + allPreviousDecisionMap.putAll(currStateOutput.getCurrentStateMap(resourceId, partition)); + allPreviousDecisionMap.putAll(currStateOutput.getPendingStateMap(resourceId, partition)); + currentMapping.put(partition.getPartitionName(), allPreviousDecisionMap); + } + + // Get the assignment keyed on partition + AutoRebalanceStrategy strategy = + new AutoRebalanceStrategy(resourceId, partitions, states, Integer.MAX_VALUE, + new AutoRebalanceStrategy.DefaultPlacementScheme()); + List<String> allNodes = + Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache)); + Collections.sort(allNodes); + ZNRecord record = strategy.computePartitionAssignment(allNodes, currentMapping, allNodes); + Map<String, List<String>> preferenceLists = record.getListFields(); + + // Convert to an assignment keyed on participant + Map<String, SortedSet<Integer>> taskAssignment = Maps.newHashMap(); + for (Map.Entry<String, List<String>> e : preferenceLists.entrySet()) { + String partitionName = e.getKey(); + partitionName = String.valueOf(TaskUtil.getPartitionId(partitionName)); + List<String> preferenceList = e.getValue(); + for (String participantName : preferenceList) { + if (!taskAssignment.containsKey(participantName)) { + taskAssignment.put(participantName, new TreeSet<Integer>()); + } + taskAssignment.get(participantName).add(Integer.valueOf(partitionName)); + } + } + + // Finally, adjust the assignment if tasks have been failing + taskAssignment = _retryPolicy.reassign(jobCfg, jobContext, allNodes, taskAssignment); + return taskAssignment; + } + + /** + * Filter a list of instances based on targeted resource policies + * @param jobCfg the job configuration + * @param currStateOutput the current state of all instances in the cluster + * @param instances valid instances + * @param cache current snapshot of the cluster + * @return a set of instances that can be assigned to + */ + private Set<String> getEligibleInstances(JobConfig jobCfg, CurrentStateOutput currStateOutput, + Iterable<String> instances, ClusterDataCache cache) { + // No target resource means any instance is available + Set<String> allInstances = Sets.newHashSet(instances); + String targetResource = jobCfg.getTargetResource(); + if (targetResource == null) { + return allInstances; + } + + // Bad ideal state means don't assign + IdealState idealState = cache.getIdealState(targetResource); + if (idealState == null) { + return Collections.emptySet(); + } + + // Get the partitions on the target resource to use + Set<String> partitions = idealState.getPartitionSet(); + List<String> targetPartitions = jobCfg.getTargetPartitions(); + if (targetPartitions != null && !targetPartitions.isEmpty()) { + partitions.retainAll(targetPartitions); + } + + // Based on state matches, add eligible instances + Set<String> eligibleInstances = Sets.newHashSet(); + Set<String> targetStates = jobCfg.getTargetPartitionStates(); + for (String partition : partitions) { + Map<String, String> stateMap = + currStateOutput.getCurrentStateMap(targetResource, new Partition(partition)); + Map<String, String> pendingStateMap = + currStateOutput.getPendingStateMap(targetResource, new Partition(partition)); + for (Map.Entry<String, String> e : stateMap.entrySet()) { + String instanceName = e.getKey(); + String state = e.getValue(); + String pending = pendingStateMap.get(instanceName); + if (pending != null) { + continue; + } + if (targetStates == null || targetStates.isEmpty() || targetStates.contains(state)) { + eligibleInstances.add(instanceName); + } + } + } + allInstances.retainAll(eligibleInstances); + return allInstances; + } + + public interface RetryPolicy { + /** + * Adjust the assignment to allow for reassignment if a task keeps failing where it's currently + * assigned + * @param jobCfg the job configuration + * @param jobCtx the job context + * @param instances instances that can serve tasks + * @param origAssignment the unmodified assignment + * @return the adjusted assignment + */ + Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext jobCtx, + Collection<String> instances, Map<String, SortedSet<Integer>> origAssignment); + } + + private static class DefaultRetryReassigner implements RetryPolicy { + @Override + public Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext jobCtx, + Collection<String> instances, Map<String, SortedSet<Integer>> origAssignment) { + // Compute an increasing integer ID for each instance + BiMap<String, Integer> instanceMap = HashBiMap.create(instances.size()); + int instanceIndex = 0; + for (String instance : instances) { + instanceMap.put(instance, instanceIndex++); + } + + // Move partitions + Map<String, SortedSet<Integer>> newAssignment = Maps.newHashMap(); + for (Map.Entry<String, SortedSet<Integer>> e : origAssignment.entrySet()) { + String instance = e.getKey(); + SortedSet<Integer> partitions = e.getValue(); + Integer instanceId = instanceMap.get(instance); + if (instanceId != null) { + for (int p : partitions) { + // Determine for each partition if there have been failures with the current assignment + // strategy, and if so, force a shift in assignment for that partition only + int shiftValue = getNumInstancesToShift(jobCfg, jobCtx, instances, p); + int newInstanceId = (instanceId + shiftValue) % instances.size(); + String newInstance = instanceMap.inverse().get(newInstanceId); + if (newInstance == null) { + newInstance = instance; + } + if (!newAssignment.containsKey(newInstance)) { + newAssignment.put(newInstance, new TreeSet<Integer>()); + } + newAssignment.get(newInstance).add(p); + } + } else { + // In case something goes wrong, just keep the previous assignment + newAssignment.put(instance, partitions); + } + } + return newAssignment; + } + + /** + * In case tasks fail, we may not want to schedule them in the same place. This method allows us + * to compute a shifting value so that we can systematically choose other instances to try + * @param jobCfg the job configuration + * @param jobCtx the job context + * @param instances instances that can be chosen + * @param p the partition to look up + * @return the shifting value + */ + private int getNumInstancesToShift(JobConfig jobCfg, JobContext jobCtx, + Collection<String> instances, int p) { + int numAttempts = jobCtx.getPartitionNumAttempts(p); + int maxNumAttempts = jobCfg.getMaxAttemptsPerTask(); + int numInstances = Math.min(instances.size(), jobCfg.getMaxForcedReassignmentsPerTask() + 1); + return numAttempts / (maxNumAttempts / numInstances); + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java deleted file mode 100644 index f4145c5..0000000 --- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java +++ /dev/null @@ -1,273 +0,0 @@ -package org.apache.helix.task; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.apache.helix.ZNRecord; -import org.apache.helix.controller.stages.ClusterDataCache; -import org.apache.helix.controller.stages.CurrentStateOutput; -import org.apache.helix.controller.strategy.AutoRebalanceStrategy; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.Partition; -import org.apache.helix.model.ResourceAssignment; - -import com.google.common.base.Function; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -/** - * This class does an assignment based on an automatic rebalancing strategy, rather than requiring - * assignment to target partitions and states of another resource - */ -public class GenericTaskRebalancer extends TaskRebalancer { - /** Reassignment policy for this algorithm */ - private RetryPolicy _retryPolicy = new DefaultRetryReassigner(); - - @Override - public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, - WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) { - Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap(); - Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap(); - for (TaskConfig taskCfg : taskMap.values()) { - String taskId = taskCfg.getId(); - int nextPartition = jobCtx.getPartitionSet().size(); - if (!taskIdMap.containsKey(taskId)) { - jobCtx.setTaskIdForPartition(nextPartition, taskId); - } - } - return jobCtx.getPartitionSet(); - } - - @Override - public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput, - ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg, - final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, - Set<Integer> partitionSet, ClusterDataCache cache) { - // Gather input to the full auto rebalancing algorithm - LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>(); - states.put("ONLINE", 1); - - // Only map partitions whose assignment we care about - final Set<TaskPartitionState> honoredStates = - Sets.newHashSet(TaskPartitionState.INIT, TaskPartitionState.RUNNING, - TaskPartitionState.STOPPED); - Set<Integer> filteredPartitionSet = Sets.newHashSet(); - for (Integer p : partitionSet) { - TaskPartitionState state = (jobContext == null) ? null : jobContext.getPartitionState(p); - if (state == null || honoredStates.contains(state)) { - filteredPartitionSet.add(p); - } - } - - // Transform from partition id to fully qualified partition name - List<Integer> partitionNums = Lists.newArrayList(partitionSet); - Collections.sort(partitionNums); - final String resourceId = prevAssignment.getResourceName(); - List<String> partitions = - new ArrayList<String>(Lists.transform(partitionNums, new Function<Integer, String>() { - @Override - public String apply(Integer partitionNum) { - return resourceId + "_" + partitionNum; - } - })); - - // Compute the current assignment - Map<String, Map<String, String>> currentMapping = Maps.newHashMap(); - for (Partition partition : currStateOutput.getCurrentStateMappedPartitions(resourceId)) { - if (!filteredPartitionSet.contains(pId(partition.getPartitionName()))) { - // not computing old partitions - continue; - } - Map<String, String> allPreviousDecisionMap = Maps.newHashMap(); - if (prevAssignment != null) { - allPreviousDecisionMap.putAll(prevAssignment.getReplicaMap(partition)); - } - allPreviousDecisionMap.putAll(currStateOutput.getCurrentStateMap(resourceId, partition)); - allPreviousDecisionMap.putAll(currStateOutput.getPendingStateMap(resourceId, partition)); - currentMapping.put(partition.getPartitionName(), allPreviousDecisionMap); - } - - // Get the assignment keyed on partition - AutoRebalanceStrategy strategy = - new AutoRebalanceStrategy(resourceId, partitions, states, Integer.MAX_VALUE, - new AutoRebalanceStrategy.DefaultPlacementScheme()); - List<String> allNodes = - Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache)); - Collections.sort(allNodes); - ZNRecord record = strategy.computePartitionAssignment(allNodes, currentMapping, allNodes); - Map<String, List<String>> preferenceLists = record.getListFields(); - - // Convert to an assignment keyed on participant - Map<String, SortedSet<Integer>> taskAssignment = Maps.newHashMap(); - for (Map.Entry<String, List<String>> e : preferenceLists.entrySet()) { - String partitionName = e.getKey(); - partitionName = String.valueOf(pId(partitionName)); - List<String> preferenceList = e.getValue(); - for (String participantName : preferenceList) { - if (!taskAssignment.containsKey(participantName)) { - taskAssignment.put(participantName, new TreeSet<Integer>()); - } - taskAssignment.get(participantName).add(Integer.valueOf(partitionName)); - } - } - - // Finally, adjust the assignment if tasks have been failing - taskAssignment = _retryPolicy.reassign(jobCfg, jobContext, allNodes, taskAssignment); - return taskAssignment; - } - - /** - * Filter a list of instances based on targeted resource policies - * @param jobCfg the job configuration - * @param currStateOutput the current state of all instances in the cluster - * @param instances valid instances - * @param cache current snapshot of the cluster - * @return a set of instances that can be assigned to - */ - private Set<String> getEligibleInstances(JobConfig jobCfg, CurrentStateOutput currStateOutput, - Iterable<String> instances, ClusterDataCache cache) { - // No target resource means any instance is available - Set<String> allInstances = Sets.newHashSet(instances); - String targetResource = jobCfg.getTargetResource(); - if (targetResource == null) { - return allInstances; - } - - // Bad ideal state means don't assign - IdealState idealState = cache.getIdealState(targetResource); - if (idealState == null) { - return Collections.emptySet(); - } - - // Get the partitions on the target resource to use - Set<String> partitions = idealState.getPartitionSet(); - List<String> targetPartitions = jobCfg.getTargetPartitions(); - if (targetPartitions != null && !targetPartitions.isEmpty()) { - partitions.retainAll(targetPartitions); - } - - // Based on state matches, add eligible instances - Set<String> eligibleInstances = Sets.newHashSet(); - Set<String> targetStates = jobCfg.getTargetPartitionStates(); - for (String partition : partitions) { - Map<String, String> stateMap = - currStateOutput.getCurrentStateMap(targetResource, new Partition(partition)); - Map<String, String> pendingStateMap = - currStateOutput.getPendingStateMap(targetResource, new Partition(partition)); - for (Map.Entry<String, String> e : stateMap.entrySet()) { - String instanceName = e.getKey(); - String state = e.getValue(); - String pending = pendingStateMap.get(instanceName); - if (pending != null) { - continue; - } - if (targetStates == null || targetStates.isEmpty() || targetStates.contains(state)) { - eligibleInstances.add(instanceName); - } - } - } - allInstances.retainAll(eligibleInstances); - return allInstances; - } - - public interface RetryPolicy { - /** - * Adjust the assignment to allow for reassignment if a task keeps failing where it's currently - * assigned - * @param jobCfg the job configuration - * @param jobCtx the job context - * @param instances instances that can serve tasks - * @param origAssignment the unmodified assignment - * @return the adjusted assignment - */ - Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext jobCtx, - Collection<String> instances, Map<String, SortedSet<Integer>> origAssignment); - } - - private static class DefaultRetryReassigner implements RetryPolicy { - @Override - public Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext jobCtx, - Collection<String> instances, Map<String, SortedSet<Integer>> origAssignment) { - // Compute an increasing integer ID for each instance - BiMap<String, Integer> instanceMap = HashBiMap.create(instances.size()); - int instanceIndex = 0; - for (String instance : instances) { - instanceMap.put(instance, instanceIndex++); - } - - // Move partitions - Map<String, SortedSet<Integer>> newAssignment = Maps.newHashMap(); - for (Map.Entry<String, SortedSet<Integer>> e : origAssignment.entrySet()) { - String instance = e.getKey(); - SortedSet<Integer> partitions = e.getValue(); - Integer instanceId = instanceMap.get(instance); - if (instanceId != null) { - for (int p : partitions) { - // Determine for each partition if there have been failures with the current assignment - // strategy, and if so, force a shift in assignment for that partition only - int shiftValue = getNumInstancesToShift(jobCfg, jobCtx, instances, p); - int newInstanceId = (instanceId + shiftValue) % instances.size(); - String newInstance = instanceMap.inverse().get(newInstanceId); - if (newInstance == null) { - newInstance = instance; - } - if (!newAssignment.containsKey(newInstance)) { - newAssignment.put(newInstance, new TreeSet<Integer>()); - } - newAssignment.get(newInstance).add(p); - } - } else { - // In case something goes wrong, just keep the previous assignment - newAssignment.put(instance, partitions); - } - } - return newAssignment; - } - - /** - * In case tasks fail, we may not want to schedule them in the same place. This method allows us - * to compute a shifting value so that we can systematically choose other instances to try - * @param jobCfg the job configuration - * @param jobCtx the job context - * @param instances instances that can be chosen - * @param p the partition to look up - * @return the shifting value - */ - private int getNumInstancesToShift(JobConfig jobCfg, JobContext jobCtx, - Collection<String> instances, int p) { - int numAttempts = jobCtx.getPartitionNumAttempts(p); - int maxNumAttempts = jobCfg.getMaxAttemptsPerTask(); - int numInstances = Math.min(instances.size(), jobCfg.getMaxForcedReassignmentsPerTask() + 1); - return numAttempts / (maxNumAttempts / numInstances); - } - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java new file mode 100644 index 0000000..0e2ab15 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -0,0 +1,650 @@ +package org.apache.helix.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import org.apache.helix.*; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.*; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; + +/** + * Custom rebalancer implementation for the {@code Job} in task model. + */ +public class JobRebalancer extends TaskRebalancer { + private static final Logger LOG = Logger.getLogger(JobRebalancer.class); + private static TaskAssignmentCalculator fixTaskAssignmentCal = + new FixedTargetTaskAssignmentCalculator(); + private static TaskAssignmentCalculator genericTaskAssignmentCal = + new GenericTaskAssignmentCalculator(); + + private static final String PREV_RA_NODE = "PreviousResourceAssignment"; + + @Override + public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData, + IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) { + final String jobName = resource.getResourceName(); + LOG.debug("Computer Best Partition for job: " + jobName); + + // Fetch job configuration + JobConfig jobCfg = TaskUtil.getJobCfg(_manager, jobName); + if (jobCfg == null) { + LOG.error("Job configuration is NULL for " + jobName); + return buildEmptyAssignment(jobName, currStateOutput); + } + String workflowResource = jobCfg.getWorkflow(); + + // Fetch workflow configuration and context + WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource); + if (workflowCfg == null) { + LOG.error("Workflow configuration is NULL for " + jobName); + return buildEmptyAssignment(jobName, currStateOutput); + } + + WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource); + if (workflowCtx == null) { + LOG.error("Workflow context is NULL for " + jobName); + return buildEmptyAssignment(jobName, currStateOutput); + } + + TargetState targetState = workflowCfg.getTargetState(); + if (targetState != TargetState.START && targetState != TargetState.STOP) { + LOG.info("Target state is " + targetState.name() + " for workflow " + workflowResource + + ".Stop scheduling job " + jobName); + return buildEmptyAssignment(jobName, currStateOutput); + } + + TaskState jobState = workflowCtx.getJobState(jobName); + // The job is already in a final state (completed/failed). + if (jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) { + LOG.info("Job " + jobName + " is failed or already completed, clean up IS."); + TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobName); + _scheduledRebalancer.removeScheduledRebalance(jobName); + return buildEmptyAssignment(jobName, currStateOutput); + } + + if (!isWorkflowReadyForSchedule(workflowCfg)) { + LOG.info("Job is not ready to be scheduled since workflow is not ready " + jobName); + return buildEmptyAssignment(jobName, currStateOutput); + } + + if (!isJobReadyToSchedule(jobName, workflowCfg, workflowCtx)) { + LOG.info("Job is not ready to be scheduled " + jobName); + return buildEmptyAssignment(jobName, currStateOutput); + } + + // Fetch any existing context information from the property store. + JobContext jobCtx = TaskUtil.getJobContext(_manager, jobName); + if (jobCtx == null) { + jobCtx = new JobContext(new ZNRecord("TaskContext")); + jobCtx.setStartTime(System.currentTimeMillis()); + } + + // Grab the old assignment, or an empty one if it doesn't exist + ResourceAssignment prevAssignment = getPrevResourceAssignment(jobName); + if (prevAssignment == null) { + prevAssignment = new ResourceAssignment(jobName); + } + + // Will contain the list of partitions that must be explicitly dropped from the ideal state that + // is stored in zk. + // Fetch the previous resource assignment from the property store. This is required because of + // HELIX-230. + Set<Integer> partitionsToDrop = new TreeSet<Integer>(); + ResourceAssignment newAssignment = + computeResourceMapping(jobName, workflowCfg, jobCfg, prevAssignment, clusterData + .getLiveInstances().keySet(), currStateOutput, workflowCtx, jobCtx, partitionsToDrop, + clusterData); + + if (!partitionsToDrop.isEmpty()) { + for (Integer pId : partitionsToDrop) { + taskIs.getRecord().getMapFields().remove(pName(jobName, pId)); + } + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); + PropertyKey propertyKey = accessor.keyBuilder().idealStates(jobName); + accessor.setProperty(propertyKey, taskIs); + } + + // Update rebalancer context, previous ideal state. + TaskUtil.setJobContext(_manager, jobName, jobCtx); + TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx); + setPrevResourceAssignment(jobName, newAssignment); + + LOG.debug("Job " + jobName + " new assignment " + Arrays + .toString(newAssignment.getMappedPartitions().toArray())); + + return newAssignment; + } + + private Set<String> getInstancesAssignedToOtherJobs(String currentJobName, + WorkflowConfig workflowCfg) { + Set<String> ret = new HashSet<String>(); + for (String jobName : workflowCfg.getJobDag().getAllNodes()) { + if (jobName.equals(currentJobName)) { + continue; + } + JobContext jobContext = TaskUtil.getJobContext(_manager, jobName); + if (jobContext == null) { + continue; + } + for (int partition : jobContext.getPartitionSet()) { + TaskPartitionState partitionState = jobContext.getPartitionState(partition); + if (partitionState == TaskPartitionState.INIT || + partitionState == TaskPartitionState.RUNNING) { + ret.add(jobContext.getAssignedParticipant(partition)); + } + } + } + + return ret; + } + + private ResourceAssignment computeResourceMapping(String jobResource, + WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment prevAssignment, + Collection<String> liveInstances, CurrentStateOutput currStateOutput, + WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs, + ClusterDataCache cache) { + TargetState jobTgtState = workflowConfig.getTargetState(); + // Update running status in workflow context + if (jobTgtState == TargetState.STOP) { + workflowCtx.setJobState(jobResource, TaskState.STOPPED); + // Workflow has been stopped if all in progress jobs are stopped + if (isWorkflowStopped(workflowCtx, workflowConfig)) { + workflowCtx.setWorkflowState(TaskState.STOPPED); + } + } else { + workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS); + // Workflow is in progress if any task is in progress + workflowCtx.setWorkflowState(TaskState.IN_PROGRESS); + } + + // Used to keep track of tasks that have already been assigned to instances. + Set<Integer> assignedPartitions = new HashSet<Integer>(); + + // Used to keep track of tasks that have failed, but whose failure is acceptable + Set<Integer> skippedPartitions = new HashSet<Integer>(); + + // Keeps a mapping of (partition) -> (instance, state) + Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>(); + + Set<String> excludedInstances = getInstancesAssignedToOtherJobs(jobResource, workflowConfig); + + // Process all the current assignments of tasks. + TaskAssignmentCalculator taskAssignmentCal = getAssignmentCalulator(jobCfg); + Set<Integer> allPartitions = + taskAssignmentCal.getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache); + Map<String, SortedSet<Integer>> taskAssignments = + getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions); + long currentTime = System.currentTimeMillis(); + for (String instance : taskAssignments.keySet()) { + if (excludedInstances.contains(instance)) { + continue; + } + + Set<Integer> pSet = taskAssignments.get(instance); + // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT, + // TASK_ERROR, ERROR. + Set<Integer> donePartitions = new TreeSet<Integer>(); + for (int pId : pSet) { + final String pName = pName(jobResource, pId); + + // Check for pending state transitions on this (partition, instance). + Message pendingMessage = + currStateOutput.getPendingState(jobResource, new Partition(pName), instance); + if (pendingMessage != null) { + // There is a pending state transition for this (partition, instance). Just copy forward + // the state assignment from the previous ideal state. + Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName)); + if (stateMap != null) { + String prevState = stateMap.get(instance); + paMap.put(pId, new PartitionAssignment(instance, prevState)); + assignedPartitions.add(pId); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.", + pName, instance, prevState)); + } + } + + continue; + } + + TaskPartitionState currState = + TaskPartitionState.valueOf(currStateOutput.getCurrentState(jobResource, new Partition( + pName), instance)); + jobCtx.setPartitionState(pId, currState); + + // Process any requested state transitions. + String requestedStateStr = + currStateOutput.getRequestedState(jobResource, new Partition(pName), instance); + if (requestedStateStr != null && !requestedStateStr.isEmpty()) { + TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr); + if (requestedState.equals(currState)) { + LOG.warn(String.format( + "Requested state %s is the same as the current state for instance %s.", + requestedState, instance)); + } + + paMap.put(pId, new PartitionAssignment(instance, requestedState.name())); + assignedPartitions.add(pId); + LOG.debug(String.format( + "Instance %s requested a state transition to %s for partition %s.", instance, + requestedState, pName)); + continue; + } + + switch (currState) { + case RUNNING: + case STOPPED: { + TaskPartitionState nextState; + if (jobTgtState == TargetState.START) { + nextState = TaskPartitionState.RUNNING; + } else { + nextState = TaskPartitionState.STOPPED; + } + + paMap.put(pId, new PartitionAssignment(instance, nextState.name())); + assignedPartitions.add(pId); + LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName, + nextState, instance)); + } + break; + case COMPLETED: { + // The task has completed on this partition. Mark as such in the context object. + donePartitions.add(pId); + LOG.debug(String + .format( + "Task partition %s has completed with state %s. Marking as such in rebalancer context.", + pName, currState)); + partitionsToDropFromIs.add(pId); + markPartitionCompleted(jobCtx, pId); + } + break; + case TIMED_OUT: + case TASK_ERROR: + case ERROR: { + donePartitions.add(pId); // The task may be rescheduled on a different instance. + LOG.debug(String.format( + "Task partition %s has error state %s. Marking as such in rebalancer context.", + pName, currState)); + markPartitionError(jobCtx, pId, currState, true); + // The error policy is to fail the task as soon a single partition fails for a specified + // maximum number of attempts. + if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()) { + // If the user does not require this task to succeed in order for the job to succeed, + // then we don't have to fail the job right now + boolean successOptional = false; + String taskId = jobCtx.getTaskIdForPartition(pId); + if (taskId != null) { + TaskConfig taskConfig = jobCfg.getTaskConfig(taskId); + if (taskConfig != null) { + successOptional = taskConfig.isSuccessOptional(); + } + } + + // Similarly, if we have some leeway for how many tasks we can fail, then we don't have + // to fail the job immediately + if (skippedPartitions.size() < jobCfg.getFailureThreshold()) { + successOptional = true; + } + + if (!successOptional) { + long finishTime = currentTime; + workflowCtx.setJobState(jobResource, TaskState.FAILED); + if (workflowConfig.isTerminable()) { + workflowCtx.setWorkflowState(TaskState.FAILED); + workflowCtx.setFinishTime(finishTime); + } + jobCtx.setFinishTime(finishTime); + markAllPartitionsError(jobCtx, currState, false); + addAllPartitions(allPartitions, partitionsToDropFromIs); + + // remove IdealState of this job + TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource); + return buildEmptyAssignment(jobResource, currStateOutput); + } else { + skippedPartitions.add(pId); + partitionsToDropFromIs.add(pId); + } + } else { + // Mark the task to be started at some later time (if enabled) + markPartitionDelayed(jobCfg, jobCtx, pId); + } + } + break; + case INIT: + case DROPPED: { + // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned. + donePartitions.add(pId); + LOG.debug(String.format( + "Task partition %s has state %s. It will be dropped from the current ideal state.", + pName, currState)); + } + break; + default: + throw new AssertionError("Unknown enum symbol: " + currState); + } + } + + // Remove the set of task partitions that are completed or in one of the error states. + pSet.removeAll(donePartitions); + } + + // For delayed tasks, trigger a rebalance event for the closest upcoming ready time + scheduleForNextTask(jobResource, jobCtx, currentTime); + + if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) { + workflowCtx.setJobState(jobResource, TaskState.COMPLETED); + jobCtx.setFinishTime(currentTime); + if (isWorkflowComplete(workflowCtx, workflowConfig)) { + workflowCtx.setWorkflowState(TaskState.COMPLETED); + workflowCtx.setFinishTime(currentTime); + } + + // remove IdealState of this job + TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource); + } + + // Make additional task assignments if needed. + if (jobTgtState == TargetState.START) { + // Contains the set of task partitions that must be excluded from consideration when making + // any new assignments. + // This includes all completed, failed, delayed, and already assigned partitions. + Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions); + addCompletedPartitions(excludeSet, jobCtx, allPartitions); + addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg); + excludeSet.addAll(skippedPartitions); + excludeSet.addAll(getNonReadyPartitions(jobCtx, currentTime)); + // Get instance->[partition, ...] mappings for the target resource. + Map<String, SortedSet<Integer>> tgtPartitionAssignments = taskAssignmentCal + .getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx, + workflowConfig, workflowCtx, allPartitions, cache); + for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) { + String instance = entry.getKey(); + if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances + .contains(instance)) { + continue; + } + // Contains the set of task partitions currently assigned to the instance. + Set<Integer> pSet = entry.getValue(); + int numToAssign = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size(); + if (numToAssign > 0) { + List<Integer> nextPartitions = + getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign); + for (Integer pId : nextPartitions) { + String pName = pName(jobResource, pId); + paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name())); + excludeSet.add(pId); + jobCtx.setAssignedParticipant(pId, instance); + jobCtx.setPartitionState(pId, TaskPartitionState.INIT); + LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName, + TaskPartitionState.RUNNING, instance)); + } + } + } + } + + // Construct a ResourceAssignment object from the map of partition assignments. + ResourceAssignment ra = new ResourceAssignment(jobResource); + for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) { + PartitionAssignment pa = e.getValue(); + ra.addReplicaMap(new Partition(pName(jobResource, e.getKey())), + ImmutableMap.of(pa._instance, pa._state)); + } + + return ra; + } + + private void scheduleForNextTask(String job, JobContext jobCtx, long now) { + // Clear current entries if they exist and are expired + long currentTime = now; + long scheduledTime = _scheduledRebalancer.getRebalanceTime(job); + if (scheduledTime > 0 && currentTime > scheduledTime) { + _scheduledRebalancer.removeScheduledRebalance(job); + } + + // Figure out the earliest schedulable time in the future of a non-complete job + boolean shouldSchedule = false; + long earliestTime = Long.MAX_VALUE; + for (int p : jobCtx.getPartitionSet()) { + long retryTime = jobCtx.getNextRetryTime(p); + TaskPartitionState state = jobCtx.getPartitionState(p); + state = (state != null) ? state : TaskPartitionState.INIT; + Set<TaskPartitionState> errorStates = + Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.TASK_ERROR, + TaskPartitionState.TIMED_OUT); + if (errorStates.contains(state) && retryTime > currentTime && retryTime < earliestTime) { + earliestTime = retryTime; + shouldSchedule = true; + } + } + + // If any was found, then schedule it + if (shouldSchedule) { + _scheduledRebalancer.scheduleRebalance(_manager, job, earliestTime); + } + } + + /** + * Get the last task assignment for a given job + * + * @param resourceName the name of the job + * @return {@link ResourceAssignment} instance, or null if no assignment is available + */ + private ResourceAssignment getPrevResourceAssignment(String resourceName) { + ZNRecord r = _manager.getHelixPropertyStore() + .get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE), + null, AccessOption.PERSISTENT); + return r != null ? new ResourceAssignment(r) : null; + } + + /** + * Set the last task assignment for a given job + * + * @param resourceName the name of the job + * @param ra {@link ResourceAssignment} containing the task assignment + */ + private void setPrevResourceAssignment(String resourceName, + ResourceAssignment ra) { + _manager.getHelixPropertyStore() + .set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE), + ra.getRecord(), AccessOption.PERSISTENT); + } + + /** + * Checks if the job has completed. + * @param ctx The rebalancer context. + * @param allPartitions The set of partitions to check. + * @param skippedPartitions partitions that failed, but whose failure is acceptable + * @return true if all task partitions have been marked with status + * {@link TaskPartitionState#COMPLETED} in the rebalancer + * context, false otherwise. + */ + private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions, + Set<Integer> skippedPartitions, JobConfig cfg) { + for (Integer pId : allPartitions) { + TaskPartitionState state = ctx.getPartitionState(pId); + if (!skippedPartitions.contains(pId) && state != TaskPartitionState.COMPLETED + && !isTaskGivenup(ctx, cfg, pId)) { + return false; + } + } + return true; + } + + + private static void addAllPartitions(Set<Integer> toAdd, Set<Integer> destination) { + for (Integer pId : toAdd) { + destination.add(pId); + } + } + + private static void addCompletedPartitions(Set<Integer> set, JobContext ctx, + Iterable<Integer> pIds) { + for (Integer pId : pIds) { + TaskPartitionState state = ctx.getPartitionState(pId); + if (state == TaskPartitionState.COMPLETED) { + set.add(pId); + } + } + } + + private static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) { + return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask(); + } + + // add all partitions that have been tried maxNumberAttempts + private static void addGiveupPartitions(Set<Integer> set, JobContext ctx, Iterable<Integer> pIds, + JobConfig cfg) { + for (Integer pId : pIds) { + if (isTaskGivenup(ctx, cfg, pId)) { + set.add(pId); + } + } + } + + private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions, + Set<Integer> excluded, int n) { + List<Integer> result = new ArrayList<Integer>(); + for (Integer pId : candidatePartitions) { + if (result.size() >= n) { + break; + } + + if (!excluded.contains(pId)) { + result.add(pId); + } + } + + return result; + } + + private static void markPartitionDelayed(JobConfig cfg, JobContext ctx, int p) { + long delayInterval = cfg.getTaskRetryDelay(); + if (delayInterval <= 0) { + return; + } + long nextStartTime = ctx.getPartitionFinishTime(p) + delayInterval; + ctx.setNextRetryTime(p, nextStartTime); + } + + private static void markPartitionCompleted(JobContext ctx, int pId) { + ctx.setPartitionState(pId, TaskPartitionState.COMPLETED); + ctx.setPartitionFinishTime(pId, System.currentTimeMillis()); + ctx.incrementNumAttempts(pId); + } + + private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state, + boolean incrementAttempts) { + ctx.setPartitionState(pId, state); + ctx.setPartitionFinishTime(pId, System.currentTimeMillis()); + if (incrementAttempts) { + ctx.incrementNumAttempts(pId); + } + } + + private static void markAllPartitionsError(JobContext ctx, TaskPartitionState state, + boolean incrementAttempts) { + for (int pId : ctx.getPartitionSet()) { + markPartitionError(ctx, pId, state, incrementAttempts); + } + } + + /** + * Return the assignment of task partitions per instance. + */ + private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments( + Iterable<String> instanceList, ResourceAssignment assignment, Set<Integer> includeSet) { + Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>(); + for (String instance : instanceList) { + result.put(instance, new TreeSet<Integer>()); + } + + for (Partition partition : assignment.getMappedPartitions()) { + int pId = TaskUtil.getPartitionId(partition.getPartitionName()); + if (includeSet.contains(pId)) { + Map<String, String> replicaMap = assignment.getReplicaMap(partition); + for (String instance : replicaMap.keySet()) { + SortedSet<Integer> pList = result.get(instance); + if (pList != null) { + pList.add(pId); + } + } + } + } + return result; + } + + private static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) { + Set<Integer> nonReadyPartitions = Sets.newHashSet(); + for (int p : ctx.getPartitionSet()) { + long toStart = ctx.getNextRetryTime(p); + if (now < toStart) { + nonReadyPartitions.add(p); + } + } + return nonReadyPartitions; + } + + private TaskAssignmentCalculator getAssignmentCalulator(JobConfig jobConfig) { + Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap(); + if (taskConfigMap != null && !taskConfigMap.isEmpty()) { + return genericTaskAssignmentCal; + } else { + return fixTaskAssignmentCal; + } + } + + /** + * Computes the partition name given the resource name and partition id. + */ + private String pName(String resource, int pId) { + return resource + "_" + pId; + } + + /** + * An (instance, state) pair. + */ + private static class PartitionAssignment { + private final String _instance; + private final String _state; + + private PartitionAssignment(String instance, String state) { + _instance = instance; + _state = state; + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java new file mode 100644 index 0000000..a3ed5ab --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java @@ -0,0 +1,45 @@ +package org.apache.helix.task; + +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.ResourceAssignment; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; + +public abstract class TaskAssignmentCalculator { + /** + * Get all the partitions that should be created by this task + * + * @param jobCfg the task configuration + * @param jobCtx the task context + * @param workflowCfg the workflow configuration + * @param workflowCtx the workflow context + * @param cache cluster snapshot + * @return set of partition numbers + */ + public abstract Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, + WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache); + + /** + * Compute an assignment of tasks to instances + * + * @param currStateOutput the current state of the instances + * @param prevAssignment the previous task partition assignment + * @param instances the instances + * @param jobCfg the task configuration + * @param jobContext the task context + * @param workflowCfg the workflow configuration + * @param workflowCtx the workflow context + * @param partitionSet the partitions to assign + * @param cache cluster snapshot + * @return map of instances to set of partition numbers + */ + public abstract Map<String, SortedSet<Integer>> getTaskAssignment( + CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment, + Collection<String> instances, JobConfig jobCfg, JobContext jobContext, + WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet, + ClusterDataCache cache); +} http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index 654ba4e..9b64aec 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -188,20 +188,21 @@ public class TaskDriver { LOG.info("Starting workflow " + flow.getName()); flow.validate(); - String flowName = flow.getName(); - - // first, add workflow config to ZK - _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, flowName), + // first, add workflow config. + _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, flow.getName()), flow.getWorkflowConfig().getResourceConfigMap()); - // then schedule jobs + // then add all job configs. for (String job : flow.getJobConfigs().keySet()) { - JobConfig.Builder builder = JobConfig.Builder.fromMap(flow.getJobConfigs().get(job)); + JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(flow.getJobConfigs().get(job)); if (flow.getTaskConfigs() != null && flow.getTaskConfigs().containsKey(job)) { - builder.addTaskConfigs(flow.getTaskConfigs().get(job)); + jobCfgBuilder.addTaskConfigs(flow.getTaskConfigs().get(job)); } - scheduleJob(job, builder.build()); + addJobConfig(job, jobCfgBuilder.build()); } + + // Finally add workflow resource. + addWorkflowResource(flow.getName()); } /** Creates a new named job queue (workflow) */ @@ -210,6 +211,7 @@ public class TaskDriver { } /** Flushes a named job queue */ + // TODO: need to make sure the queue is stopped or completed before flush the queue. public void flushQueue(String queueName) throws Exception { WorkflowConfig config = TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, queueName); @@ -351,54 +353,57 @@ public class TaskDriver { _propertyStore.remove(jobPropertyPath, AccessOption.PERSISTENT); } - /** Remove the job name from the DAG from the queue configuration */ + /** + * Remove the job name from the DAG from the queue configuration + */ private void removeJobFromDag(final String queueName, final String jobName) { final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName); - DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() { + DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() { @Override public ZNRecord update(ZNRecord currentData) { + if (currentData == null) { + LOG.error("Could not update DAG for queue: " + queueName + " ZNRecord is null."); + return null; + } // Add the node to the existing DAG JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG)); Set<String> allNodes = jobDag.getAllNodes(); if (!allNodes.contains(namespacedJobName)) { - LOG.warn("Could not delete job from queue " + queueName + ", job " + jobName + " not exists"); - } else { - String parent = null; - String child = null; - // remove the node from the queue - for (String node : allNodes) { - if (!node.equals(namespacedJobName)) { - if (jobDag.getDirectChildren(node).contains(namespacedJobName)) { - parent = node; - jobDag.removeParentToChild(parent, namespacedJobName); - } else if (jobDag.getDirectParents(node).contains(namespacedJobName)) { - child = node; - jobDag.removeParentToChild(namespacedJobName, child); - } - } - } - - if (parent != null && child != null) { - jobDag.addParentToChild(parent, child); + LOG.warn( + "Could not delete job from queue " + queueName + ", job " + jobName + " not exists"); + return currentData; + } + String parent = null; + String child = null; + // remove the node from the queue + for (String node : allNodes) { + if (jobDag.getDirectChildren(node).contains(namespacedJobName)) { + parent = node; + jobDag.removeParentToChild(parent, namespacedJobName); + } else if (jobDag.getDirectParents(node).contains(namespacedJobName)) { + child = node; + jobDag.removeParentToChild(namespacedJobName, child); } + } + if (parent != null && child != null) { + jobDag.addParentToChild(parent, child); + } + jobDag.removeNode(namespacedJobName); - jobDag.removeNode(namespacedJobName); - - // Save the updated DAG - try { - currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson()); - } catch (Exception e) { - throw new IllegalStateException("Could not remove job " + jobName + " from DAG of queue " + queueName, e); - } + // Save the updated DAG + try { + currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson()); + } catch (Exception e) { + throw new IllegalStateException( + "Could not remove job " + jobName + " from DAG of queue " + queueName, e); } return currentData; } }; String path = _accessor.keyBuilder().resourceConfig(queueName).getPath(); - boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT); - if (!status) { + if (!_accessor.getBaseDataAccessor().update(path, dagRemover, AccessOption.PERSISTENT)) { throw new IllegalArgumentException( "Could not remove job " + jobName + " from DAG of queue " + queueName); } @@ -449,8 +454,12 @@ public class TaskDriver { // Create the job to ensure that it validates JobConfig jobConfig = jobBuilder.setWorkflow(queueName).build(); - // Add the job to the end of the queue in the DAG final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName); + + // add job config first. + addJobConfig(namespacedJobName, jobConfig); + + // Add the job to the end of the queue in the DAG DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() { @Override public ZNRecord update(ZNRecord currentData) { @@ -495,22 +504,38 @@ public class TaskDriver { throw new IllegalArgumentException("Could not enqueue job"); } // Schedule the job - scheduleJob(namespacedJobName, jobConfig); + TaskUtil.invokeRebalance(_accessor, queueName); } - /** Posts new job to cluster */ - private void scheduleJob(String jobResource, JobConfig jobConfig) throws Exception { - // Set up job resource based on partitions from target resource - int numIndependentTasks = jobConfig.getTaskConfigMap().size(); - int numPartitions = - (numIndependentTasks > 0) ? numIndependentTasks : _admin - .getResourceIdealState(_clusterName, jobConfig.getTargetResource()).getPartitionSet() - .size(); - _admin.addResource(_clusterName, jobResource, numPartitions, TaskConstants.STATE_MODEL_NAME); + /** Posts new workflow resource to cluster */ + private void addWorkflowResource(String workflow) { + // Add workflow resource + _admin.addResource(_clusterName, workflow, 1, TaskConstants.STATE_MODEL_NAME); + + // Push out new ideal state for the workflow + CustomModeISBuilder IsBuilder = new CustomModeISBuilder(workflow); + IsBuilder.setRebalancerMode(IdealState.RebalanceMode.TASK) + .setNumReplica(1).setNumPartitions(1) + .setStateModel(TaskConstants.STATE_MODEL_NAME) + .setDisableExternalView(true); + + IdealState is = IsBuilder.build(); + is.getRecord().setListField(workflow, new ArrayList<String>()); + is.getRecord().setMapField(workflow, new HashMap<String, String>()); + is.setRebalancerClassName(WorkflowRebalancer.class.getName()); + _admin.setResourceIdealState(_clusterName, workflow, is); + + } + + /** + * Add new job config to cluster + */ + private void addJobConfig(String jobName, JobConfig jobConfig) { + LOG.info("Add job configuration " + jobName); // Set the job configuration PropertyKey.Builder keyBuilder = _accessor.keyBuilder(); - HelixProperty resourceConfig = new HelixProperty(jobResource); + HelixProperty resourceConfig = new HelixProperty(jobName); resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap()); Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap(); if (taskConfigMap != null) { @@ -518,30 +543,10 @@ public class TaskDriver { resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap()); } } - _accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig); - - // Push out new ideal state based on number of target partitions - CustomModeISBuilder builder = new CustomModeISBuilder(jobResource); - builder.setRebalancerMode(IdealState.RebalanceMode.TASK); - builder.setNumReplica(1); - builder.setNumPartitions(numPartitions); - builder.setStateModel(TaskConstants.STATE_MODEL_NAME); - if (jobConfig.isDisableExternalView()) { - builder.setDisableExternalView(jobConfig.isDisableExternalView()); + if (!_accessor.setProperty(keyBuilder.resourceConfig(jobName), resourceConfig)) { + LOG.error("Failed to add job configuration for job " + jobName); } - - IdealState is = builder.build(); - for (int i = 0; i < numPartitions; i++) { - is.getRecord().setListField(jobResource + "_" + i, new ArrayList<String>()); - is.getRecord().setMapField(jobResource + "_" + i, new HashMap<String, String>()); - } - if (taskConfigMap != null && !taskConfigMap.isEmpty()) { - is.setRebalancerClassName(GenericTaskRebalancer.class.getName()); - } else { - is.setRebalancerClassName(FixedTargetTaskRebalancer.class.getName()); - } - _admin.setResourceIdealState(_clusterName, jobResource, is); } /** Public method to resume a workflow/queue */ @@ -565,52 +570,47 @@ public class TaskDriver { private void setWorkflowTargetState(String workflowName, TargetState state) { setSingleWorkflowTargetState(workflowName, state); - // TODO: this is the temporary fix for current task rebalance implementation. - // We should fix this in new task framework implementation. + // TODO: just need to change the lastScheduledWorkflow. List<String> resources = _accessor.getChildNames(_accessor.keyBuilder().resourceConfigs()); for (String resource : resources) { if (resource.startsWith(workflowName)) { setSingleWorkflowTargetState(resource, state); } } - - /* TODO: use this code for new task framework. - // For recurring schedules, last scheduled incomplete workflow must also be handled - WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflowName); - String lastScheduledWorkflow = wCtx.getLastScheduledSingleWorkflow(); - if (lastScheduledWorkflow != null) { - WorkflowContext lastScheduledWorkflowCtx = - TaskUtil.getWorkflowContext(_propertyStore, lastScheduledWorkflow); - if (lastScheduledWorkflowCtx != null && !( - lastScheduledWorkflowCtx.getWorkflowState() == TaskState.COMPLETED - || lastScheduledWorkflowCtx.getWorkflowState() == TaskState.FAILED)) { - setSingleWorkflowTargetState(lastScheduledWorkflow, state); - } - } - */ } /** Helper function to change target state for a given workflow */ private void setSingleWorkflowTargetState(String workflowName, final TargetState state) { + LOG.info("Set " + workflowName + " to target state " + state); DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() { @Override public ZNRecord update(ZNRecord currentData) { - if (currentData != null){ + if (currentData != null) { // Only update target state for non-completed workflows String finishTime = currentData.getSimpleField(WorkflowContext.FINISH_TIME); if (finishTime == null || finishTime.equals(WorkflowContext.UNFINISHED)) { currentData.setSimpleField(WorkflowConfig.TARGET_STATE, state.name()); + } else { + LOG.info("TargetState DataUpdater: ignore to update target state " + finishTime); } + } else { + LOG.error("TargetState DataUpdater: Fails to update target state " + currentData); } return currentData; } }; List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList(); - updaters.add(updater); List<String> paths = Lists.newArrayList(); - paths.add(_accessor.keyBuilder().resourceConfig(workflowName).getPath()); - _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT); - invokeRebalance(); + + PropertyKey cfgKey = TaskUtil.getWorkflowConfigKey(_accessor, workflowName); + if (_accessor.getProperty(cfgKey) != null) { + paths.add(_accessor.keyBuilder().resourceConfig(workflowName).getPath()); + updaters.add(updater); + _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT); + TaskUtil.invokeRebalance(_accessor, workflowName); + } else { + LOG.error("Configuration path " + cfgKey + " not found!"); + } } public void list(String resource) { @@ -666,21 +666,6 @@ public class TaskDriver { } } - /** - * Hack to invoke rebalance until bug concerning resource config changes not driving rebalance is - * fixed - */ - public void invokeRebalance() { - // find a task - for (String resource : _admin.getResourcesInCluster(_clusterName)) { - IdealState is = _admin.getResourceIdealState(_clusterName, resource); - if (is != null && is.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) { - _accessor.updateProperty(_accessor.keyBuilder().idealStates(resource), is); - break; - } - } - } - /** Constructs options set for all basic control messages */ private static Options constructOptions() { Options options = new Options();
