http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index 3842b66..1526883 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -19,679 +19,56 @@ package org.apache.helix.task; * under the License. */ -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.Date; import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import org.I0Itec.zkclient.DataUpdater; -import org.apache.helix.AccessOption; -import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixDefinedState; import org.apache.helix.HelixManager; -import org.apache.helix.PropertyKey; -import org.apache.helix.ZNRecord; import org.apache.helix.controller.rebalancer.Rebalancer; import org.apache.helix.controller.rebalancer.internal.MappingCalculator; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.IdealState; -import org.apache.helix.model.Message; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; import org.apache.log4j.Logger; -import com.google.common.base.Joiner; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; /** - * Custom rebalancer implementation for the {@code Task} state model. + * Abstract rebalancer class for the {@code Task} state model. */ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { private static final Logger LOG = Logger.getLogger(TaskRebalancer.class); - // Management of already-scheduled rebalances across jobs - private static final BiMap<String, Date> SCHEDULED_TIMES = HashBiMap.create(); - private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors - .newSingleThreadScheduledExecutor(); - // For connection management - private HelixManager _manager; - - /** - * Get all the partitions that should be created by this task - * @param jobCfg the task configuration - * @param jobCtx the task context - * @param workflowCfg the workflow configuration - * @param workflowCtx the workflow context - * @param cache cluster snapshot - * @return set of partition numbers - */ - public abstract Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, - WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache); + protected HelixManager _manager; + protected static ScheduledRebalancer _scheduledRebalancer = new ScheduledRebalancer(); - /** - * Compute an assignment of tasks to instances - * @param currStateOutput the current state of the instances - * @param prevAssignment the previous task partition assignment - * @param instances the instances - * @param jobCfg the task configuration - * @param jobContext the task context - * @param workflowCfg the workflow configuration - * @param workflowCtx the workflow context - * @param partitionSet the partitions to assign - * @param cache cluster snapshot - * @return map of instances to set of partition numbers - */ - public abstract Map<String, SortedSet<Integer>> getTaskAssignment( - CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment, - Collection<String> instances, JobConfig jobCfg, JobContext jobContext, - WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet, - ClusterDataCache cache); - - @Override - public void init(HelixManager manager) { + @Override public void init(HelixManager manager) { _manager = manager; } - @Override - public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData, - IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) { - final String resourceName = resource.getResourceName(); - LOG.debug("Computer Best Partition for resource: " + resourceName); - - // Fetch job configuration - JobConfig jobCfg = TaskUtil.getJobCfg(_manager, resourceName); - if (jobCfg == null) { - LOG.debug("Job configuration is NULL for " + resourceName); - return emptyAssignment(resourceName, currStateOutput); - } - String workflowResource = jobCfg.getWorkflow(); - - // Fetch workflow configuration and context - WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource); - if (workflowCfg == null) { - LOG.debug("Workflow configuration is NULL for " + resourceName); - return emptyAssignment(resourceName, currStateOutput); - } - WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource); - - // Initialize workflow context if needed - if (workflowCtx == null) { - workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext")); - workflowCtx.setStartTime(System.currentTimeMillis()); - LOG.info("Workflow context for " + resourceName + " created!"); - } - - // check ancestor job status - int notStartedCount = 0; - int inCompleteCount = 0; - for (String ancestor : workflowCfg.getJobDag().getAncestors(resourceName)) { - TaskState jobState = workflowCtx.getJobState(ancestor); - if (jobState == null || jobState == TaskState.NOT_STARTED) { - ++notStartedCount; - } else if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) { - ++inCompleteCount; - } - } - - if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()) { - LOG.debug("Job is not ready to be scheduled due to pending dependent jobs " + resourceName); - return emptyAssignment(resourceName, currStateOutput); - } - - // Clean up if workflow marked for deletion - TargetState targetState = workflowCfg.getTargetState(); - if (targetState == TargetState.DELETE) { - LOG.info( - "Workflow is marked as deleted " + workflowResource - + " cleaning up the workflow context."); - cleanup(_manager, resourceName, workflowCfg, workflowResource); - return emptyAssignment(resourceName, currStateOutput); - } - - // Check if this workflow has been finished past its expiry. - if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED - && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis()) { - LOG.info("Workflow " + workflowResource - + " is completed and passed expiry time, cleaning up the workflow context."); - markForDeletion(_manager, workflowResource); - cleanup(_manager, resourceName, workflowCfg, workflowResource); - return emptyAssignment(resourceName, currStateOutput); - } - - // Fetch any existing context information from the property store. - JobContext jobCtx = TaskUtil.getJobContext(_manager, resourceName); - if (jobCtx == null) { - jobCtx = new JobContext(new ZNRecord("TaskContext")); - jobCtx.setStartTime(System.currentTimeMillis()); - } - - // Check for expired jobs for non-terminable workflows - long jobFinishTime = jobCtx.getFinishTime(); - if (!workflowCfg.isTerminable() && jobFinishTime != WorkflowContext.UNFINISHED - && jobFinishTime + workflowCfg.getExpiry() <= System.currentTimeMillis()) { - LOG.info("Job " + resourceName - + " is completed and passed expiry time, cleaning up the job context."); - cleanup(_manager, resourceName, workflowCfg, workflowResource); - return emptyAssignment(resourceName, currStateOutput); - } - - // The job is already in a final state (completed/failed). - if (workflowCtx.getJobState(resourceName) == TaskState.FAILED - || workflowCtx.getJobState(resourceName) == TaskState.COMPLETED) { - LOG.debug("Job " + resourceName + " is failed or already completed."); - return emptyAssignment(resourceName, currStateOutput); - } - - // Check for readiness, and stop processing if it's not ready - boolean isReady = - scheduleIfNotReady(workflowCfg, workflowCtx, workflowResource, resourceName, clusterData); - if (!isReady) { - LOG.debug("Job " + resourceName + " is not ready to be scheduled."); - return emptyAssignment(resourceName, currStateOutput); - } - - // Grab the old assignment, or an empty one if it doesn't exist - ResourceAssignment prevAssignment = TaskUtil.getPrevResourceAssignment(_manager, resourceName); - if (prevAssignment == null) { - prevAssignment = new ResourceAssignment(resourceName); - } - - // Will contain the list of partitions that must be explicitly dropped from the ideal state that - // is stored in zk. - // Fetch the previous resource assignment from the property store. This is required because of - // HELIX-230. - Set<Integer> partitionsToDrop = new TreeSet<Integer>(); - - ResourceAssignment newAssignment = - computeResourceMapping(resourceName, workflowCfg, jobCfg, prevAssignment, clusterData - .getLiveInstances().keySet(), currStateOutput, workflowCtx, jobCtx, partitionsToDrop, - clusterData); - - if (!partitionsToDrop.isEmpty()) { - for (Integer pId : partitionsToDrop) { - taskIs.getRecord().getMapFields().remove(pName(resourceName, pId)); - } - HelixDataAccessor accessor = _manager.getHelixDataAccessor(); - PropertyKey propertyKey = accessor.keyBuilder().idealStates(resourceName); - accessor.setProperty(propertyKey, taskIs); - } - - // Update rebalancer context, previous ideal state. - TaskUtil.setJobContext(_manager, resourceName, jobCtx); - TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx); - TaskUtil.setPrevResourceAssignment(_manager, resourceName, newAssignment); - - LOG.debug("Job " + resourceName + " new assignment " + Arrays - .toString(newAssignment.getMappedPartitions().toArray())); - - return newAssignment; - } - - private Set<String> getInstancesAssignedToOtherJobs(String currentJobName, - WorkflowConfig workflowCfg) { - - Set<String> ret = new HashSet<String>(); - - for (String jobName : workflowCfg.getJobDag().getAllNodes()) { - if (jobName.equals(currentJobName)) { - continue; - } - - JobContext jobContext = TaskUtil.getJobContext(_manager, jobName); - if (jobContext == null) { - continue; - } - for (int partition : jobContext.getPartitionSet()) { - TaskPartitionState partitionState = jobContext.getPartitionState(partition); - if (partitionState == TaskPartitionState.INIT || - partitionState == TaskPartitionState.RUNNING) { - ret.add(jobContext.getAssignedParticipant(partition)); - } - } - } - - return ret; - } - - private ResourceAssignment computeResourceMapping(String jobResource, - WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment prevAssignment, - Collection<String> liveInstances, CurrentStateOutput currStateOutput, - WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs, - ClusterDataCache cache) { - TargetState jobTgtState = workflowConfig.getTargetState(); - - // Update running status in workflow context - if (jobTgtState == TargetState.STOP) { - workflowCtx.setJobState(jobResource, TaskState.STOPPED); - // Workflow has been stopped if all jobs are stopped - if (isWorkflowStopped(workflowCtx, workflowConfig)) { - workflowCtx.setWorkflowState(TaskState.STOPPED); - } - } else { - workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS); - // Workflow is in progress if any task is in progress - workflowCtx.setWorkflowState(TaskState.IN_PROGRESS); - } - - // Used to keep track of tasks that have already been assigned to instances. - Set<Integer> assignedPartitions = new HashSet<Integer>(); - - // Used to keep track of tasks that have failed, but whose failure is acceptable - Set<Integer> skippedPartitions = new HashSet<Integer>(); - - // Keeps a mapping of (partition) -> (instance, state) - Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>(); - - Set<String> excludedInstances = getInstancesAssignedToOtherJobs(jobResource, workflowConfig); - - // Process all the current assignments of tasks. - Set<Integer> allPartitions = - getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache); - Map<String, SortedSet<Integer>> taskAssignments = - getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions); - long currentTime = System.currentTimeMillis(); - for (String instance : taskAssignments.keySet()) { - if (excludedInstances.contains(instance)) { - continue; - } - - Set<Integer> pSet = taskAssignments.get(instance); - // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT, - // TASK_ERROR, ERROR. - Set<Integer> donePartitions = new TreeSet<Integer>(); - for (int pId : pSet) { - final String pName = pName(jobResource, pId); - - // Check for pending state transitions on this (partition, instance). - Message pendingMessage = - currStateOutput.getPendingState(jobResource, new Partition(pName), instance); - if (pendingMessage != null) { - // There is a pending state transition for this (partition, instance). Just copy forward - // the state assignment from the previous ideal state. - Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName)); - if (stateMap != null) { - String prevState = stateMap.get(instance); - paMap.put(pId, new PartitionAssignment(instance, prevState)); - assignedPartitions.add(pId); - if (LOG.isDebugEnabled()) { - LOG.debug(String - .format( - "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.", - pName, instance, prevState)); - } - } - - continue; - } - - TaskPartitionState currState = - TaskPartitionState.valueOf(currStateOutput.getCurrentState(jobResource, new Partition( - pName), instance)); - jobCtx.setPartitionState(pId, currState); - - // Process any requested state transitions. - String requestedStateStr = - currStateOutput.getRequestedState(jobResource, new Partition(pName), instance); - if (requestedStateStr != null && !requestedStateStr.isEmpty()) { - TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr); - if (requestedState.equals(currState)) { - LOG.warn(String.format( - "Requested state %s is the same as the current state for instance %s.", - requestedState, instance)); - } - - paMap.put(pId, new PartitionAssignment(instance, requestedState.name())); - assignedPartitions.add(pId); - LOG.debug(String.format( - "Instance %s requested a state transition to %s for partition %s.", instance, - requestedState, pName)); - continue; - } - - switch (currState) { - case RUNNING: - case STOPPED: { - TaskPartitionState nextState; - if (jobTgtState == TargetState.START) { - nextState = TaskPartitionState.RUNNING; - } else { - nextState = TaskPartitionState.STOPPED; - } - - paMap.put(pId, new PartitionAssignment(instance, nextState.name())); - assignedPartitions.add(pId); - LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName, - nextState, instance)); - } - break; - case COMPLETED: { - // The task has completed on this partition. Mark as such in the context object. - donePartitions.add(pId); - LOG.debug(String - .format( - "Task partition %s has completed with state %s. Marking as such in rebalancer context.", - pName, currState)); - partitionsToDropFromIs.add(pId); - markPartitionCompleted(jobCtx, pId); - } - break; - case TIMED_OUT: - case TASK_ERROR: - case ERROR: { - donePartitions.add(pId); // The task may be rescheduled on a different instance. - LOG.debug(String.format( - "Task partition %s has error state %s. Marking as such in rebalancer context.", - pName, currState)); - markPartitionError(jobCtx, pId, currState, true); - // The error policy is to fail the task as soon a single partition fails for a specified - // maximum number of attempts. - if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()) { - // If the user does not require this task to succeed in order for the job to succeed, - // then we don't have to fail the job right now - boolean successOptional = false; - String taskId = jobCtx.getTaskIdForPartition(pId); - if (taskId != null) { - TaskConfig taskConfig = jobCfg.getTaskConfig(taskId); - if (taskConfig != null) { - successOptional = taskConfig.isSuccessOptional(); - } - } - - // Similarly, if we have some leeway for how many tasks we can fail, then we don't have - // to fail the job immediately - if (skippedPartitions.size() < jobCfg.getFailureThreshold()) { - successOptional = true; - } - - if (!successOptional) { - long finishTime = currentTime; - workflowCtx.setJobState(jobResource, TaskState.FAILED); - if (workflowConfig.isTerminable()) { - workflowCtx.setWorkflowState(TaskState.FAILED); - workflowCtx.setFinishTime(finishTime); - } - jobCtx.setFinishTime(finishTime); - markAllPartitionsError(jobCtx, currState, false); - addAllPartitions(allPartitions, partitionsToDropFromIs); - return emptyAssignment(jobResource, currStateOutput); - } else { - skippedPartitions.add(pId); - partitionsToDropFromIs.add(pId); - } - } else { - // Mark the task to be started at some later time (if enabled) - markPartitionDelayed(jobCfg, jobCtx, pId); - } - } - break; - case INIT: - case DROPPED: { - // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned. - donePartitions.add(pId); - LOG.debug(String.format( - "Task partition %s has state %s. It will be dropped from the current ideal state.", - pName, currState)); - } - break; - default: - throw new AssertionError("Unknown enum symbol: " + currState); - } - } - - // Remove the set of task partitions that are completed or in one of the error states. - pSet.removeAll(donePartitions); - } - - // For delayed tasks, trigger a rebalance event for the closest upcoming ready time - scheduleForNextTask(jobResource, jobCtx, currentTime); - - if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) { - workflowCtx.setJobState(jobResource, TaskState.COMPLETED); - jobCtx.setFinishTime(currentTime); - if (isWorkflowComplete(workflowCtx, workflowConfig)) { - workflowCtx.setWorkflowState(TaskState.COMPLETED); - workflowCtx.setFinishTime(currentTime); - } - } - - // Make additional task assignments if needed. - if (jobTgtState == TargetState.START) { - // Contains the set of task partitions that must be excluded from consideration when making - // any new assignments. - // This includes all completed, failed, delayed, and already assigned partitions. - Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions); - addCompletedPartitions(excludeSet, jobCtx, allPartitions); - addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg); - excludeSet.addAll(skippedPartitions); - excludeSet.addAll(getNonReadyPartitions(jobCtx, currentTime)); - // Get instance->[partition, ...] mappings for the target resource. - Map<String, SortedSet<Integer>> tgtPartitionAssignments = - getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx, - workflowConfig, workflowCtx, allPartitions, cache); - for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) { - String instance = entry.getKey(); - if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances.contains(instance)) { - continue; - } - // Contains the set of task partitions currently assigned to the instance. - Set<Integer> pSet = entry.getValue(); - int numToAssign = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size(); - if (numToAssign > 0) { - List<Integer> nextPartitions = - getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign); - for (Integer pId : nextPartitions) { - String pName = pName(jobResource, pId); - paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name())); - excludeSet.add(pId); - jobCtx.setAssignedParticipant(pId, instance); - jobCtx.setPartitionState(pId, TaskPartitionState.INIT); - LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName, - TaskPartitionState.RUNNING, instance)); - } - } - } - } - - // Construct a ResourceAssignment object from the map of partition assignments. - ResourceAssignment ra = new ResourceAssignment(jobResource); - for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) { - PartitionAssignment pa = e.getValue(); - ra.addReplicaMap(new Partition(pName(jobResource, e.getKey())), - ImmutableMap.of(pa._instance, pa._state)); - } - - return ra; - } - - /** - * Check if a workflow is ready to schedule, and schedule a rebalance if it is not - * @param workflowCfg the workflow to check - * @param workflowCtx the current workflow context - * @param workflowResource the Helix resource associated with the workflow - * @param jobResource a job from the workflow - * @param cache the current snapshot of the cluster - * @return true if ready, false if not ready - */ - private boolean scheduleIfNotReady(WorkflowConfig workflowCfg, WorkflowContext workflowCtx, - String workflowResource, String jobResource, ClusterDataCache cache) { - // Ignore non-scheduled workflows - if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) { - return true; - } - - // Figure out when this should be run, and if it's ready, then just run it - ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig(); - Date startTime = scheduleConfig.getStartTime(); - long currentTime = new Date().getTime(); - long delayFromStart = startTime.getTime() - currentTime; - - if (delayFromStart <= 0) { - // Remove any timers that are past-time for this workflow - Date scheduledTime = SCHEDULED_TIMES.get(workflowResource); - if (scheduledTime != null && currentTime > scheduledTime.getTime()) { - LOG.debug("Remove schedule timer for " + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource)); - SCHEDULED_TIMES.remove(workflowResource); - } - - // Recurring workflows are just templates that spawn new workflows - if (scheduleConfig.isRecurring()) { - // Skip scheduling this workflow if it's not in a start state - if (!workflowCfg.getTargetState().equals(TargetState.START)) { - LOG.debug( - "Skip scheduling since the workflow has not been started " + workflowResource); - return false; - } - - // Skip scheduling this workflow again if the previous run (if any) is still active - String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow(); - if (lastScheduled != null) { - WorkflowContext lastWorkflowCtx = TaskUtil.getWorkflowContext(_manager, lastScheduled); - if (lastWorkflowCtx != null - && lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) { - LOG.info("Skip scheduling since last schedule has not completed yet " + lastScheduled); - return false; - } - } - - // Figure out how many jumps are needed, thus the time to schedule the next workflow - // The negative of the delay is the amount of time past the start time - long period = - scheduleConfig.getRecurrenceUnit().toMillis(scheduleConfig.getRecurrenceInterval()); - long offsetMultiplier = (-delayFromStart) / period; - long timeToSchedule = period * offsetMultiplier + startTime.getTime(); - - // Now clone the workflow if this clone has not yet been created - DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmssZ"); - // Now clone the workflow if this clone has not yet been created - String newWorkflowName = workflowResource + "_" + df.format(new java.util.Date(timeToSchedule)); - LOG.debug("Ready to start workflow " + newWorkflowName); - if (!newWorkflowName.equals(lastScheduled)) { - Workflow clonedWf = - TaskUtil.cloneWorkflow(_manager, workflowResource, newWorkflowName, new Date( - timeToSchedule)); - TaskDriver driver = new TaskDriver(_manager); - try { - // Start the cloned workflow - driver.start(clonedWf); - } catch (Exception e) { - LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e); - } - // Persist workflow start regardless of success to avoid retrying and failing - workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName); - TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx); - } - - // Change the time to trigger the pipeline to that of the next run - startTime = new Date(timeToSchedule + period); - delayFromStart = startTime.getTime() - System.currentTimeMillis(); - } else { - // This is a one-time workflow and is ready - return true; - } - } - - scheduleRebalance(workflowResource, jobResource, startTime, delayFromStart); - return false; - } - - private void scheduleRebalance(String id, String jobResource, Date startTime, long delayFromStart) { - // Do nothing if there is already a timer set for the this workflow with the same start time. - if ((SCHEDULED_TIMES.containsKey(id) && SCHEDULED_TIMES.get(id).equals(startTime)) - || SCHEDULED_TIMES.inverse().containsKey(startTime)) { - LOG.debug("Schedule timer for" + id + "and job: " + jobResource + " is up to date."); - return; - } - LOG.info( - "Schedule rebalance with id: " + id + "and job: " + jobResource + " at time: " + startTime - + " delay from start: " + delayFromStart); - - // For workflows not yet scheduled, schedule them and record it - RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, jobResource); - SCHEDULED_TIMES.put(id, startTime); - SCHEDULED_EXECUTOR.schedule(rebalanceInvoker, delayFromStart, TimeUnit.MILLISECONDS); - } - - private void scheduleForNextTask(String jobResource, JobContext ctx, long now) { - // Clear current entries if they exist and are expired - long currentTime = now; - Date scheduledTime = SCHEDULED_TIMES.get(jobResource); - if (scheduledTime != null && currentTime > scheduledTime.getTime()) { - LOG.debug("Remove schedule timer for" + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource)); - SCHEDULED_TIMES.remove(jobResource); - } - - // Figure out the earliest schedulable time in the future of a non-complete job - boolean shouldSchedule = false; - long earliestTime = Long.MAX_VALUE; - for (int p : ctx.getPartitionSet()) { - long retryTime = ctx.getNextRetryTime(p); - TaskPartitionState state = ctx.getPartitionState(p); - state = (state != null) ? state : TaskPartitionState.INIT; - Set<TaskPartitionState> errorStates = - Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.TASK_ERROR, - TaskPartitionState.TIMED_OUT); - if (errorStates.contains(state) && retryTime > currentTime && retryTime < earliestTime) { - earliestTime = retryTime; - shouldSchedule = true; - } - } - - // If any was found, then schedule it - if (shouldSchedule) { - long delay = earliestTime - currentTime; - Date startTime = new Date(earliestTime); - scheduleRebalance(jobResource, jobResource, startTime, delay); - } - } - - /** - * Checks if the job has completed. - * @param ctx The rebalancer context. - * @param allPartitions The set of partitions to check. - * @param skippedPartitions partitions that failed, but whose failure is acceptable - * @return true if all task partitions have been marked with status - * {@link TaskPartitionState#COMPLETED} in the rebalancer - * context, false otherwise. - */ - private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions, - Set<Integer> skippedPartitions, JobConfig cfg) { - for (Integer pId : allPartitions) { - TaskPartitionState state = ctx.getPartitionState(pId); - if (!skippedPartitions.contains(pId) && state != TaskPartitionState.COMPLETED - && !isTaskGivenup(ctx, cfg, pId)) { - return false; - } - } - return true; - } + @Override public abstract ResourceAssignment computeBestPossiblePartitionState( + ClusterDataCache clusterData, IdealState taskIs, Resource resource, + CurrentStateOutput currStateOutput); /** * Checks if the workflow has completed. + * * @param ctx Workflow context containing job states * @param cfg Workflow config containing set of jobs * @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise. */ - private static boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) { + protected boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) { if (!cfg.isTerminable()) { return false; } @@ -705,149 +82,23 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { /** * Checks if the workflow has been stopped. + * * @param ctx Workflow context containing task states * @param cfg Workflow config containing set of tasks * @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise. */ - private static boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) { + protected boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) { for (String job : cfg.getJobDag().getAllNodes()) { - if (ctx.getJobState(job) != TaskState.STOPPED && ctx.getJobState(job) != null) { + TaskState jobState = ctx.getJobState(job); + if (jobState != null && jobState != TaskState.COMPLETED && jobState != TaskState.FAILED + && jobState != TaskState.STOPPED) return false; - } } return true; } - private static void markForDeletion(HelixManager mgr, String resourceName) { - mgr.getConfigAccessor().set( - TaskUtil.getResourceConfigScope(mgr.getClusterName(), resourceName), - WorkflowConfig.TARGET_STATE, TargetState.DELETE.name()); - } - - /** - * Cleans up all Helix state associated with this job, wiping workflow-level information if this - * is the last remaining job in its workflow, and the workflow is terminable. - */ - private static void cleanup(HelixManager mgr, final String resourceName, WorkflowConfig cfg, - String workflowResource) { - LOG.info("Cleaning up job: " + resourceName + " in workflow: " + workflowResource); - HelixDataAccessor accessor = mgr.getHelixDataAccessor(); - - // Remove any DAG references in workflow - PropertyKey workflowKey = getConfigPropertyKey(accessor, workflowResource); - DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() { - @Override - public ZNRecord update(ZNRecord currentData) { - JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG)); - for (String child : jobDag.getDirectChildren(resourceName)) { - jobDag.getChildrenToParents().get(child).remove(resourceName); - } - for (String parent : jobDag.getDirectParents(resourceName)) { - jobDag.getParentsToChildren().get(parent).remove(resourceName); - } - jobDag.getChildrenToParents().remove(resourceName); - jobDag.getParentsToChildren().remove(resourceName); - jobDag.getAllNodes().remove(resourceName); - try { - currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson()); - } catch (Exception e) { - LOG.equals("Could not update DAG for job: " + resourceName); - } - return currentData; - } - }; - accessor.getBaseDataAccessor().update(workflowKey.getPath(), dagRemover, - AccessOption.PERSISTENT); - - // Delete resource configs. - PropertyKey cfgKey = getConfigPropertyKey(accessor, resourceName); - if (!accessor.removeProperty(cfgKey)) { - throw new RuntimeException(String.format( - "Error occurred while trying to clean up job %s. Failed to remove node %s from Helix. Aborting further clean up steps.", - resourceName, - cfgKey)); - } - - // Delete property store information for this resource. - // For recurring workflow, it's OK if the node doesn't exist. - String propStoreKey = getRebalancerPropStoreKey(resourceName); - mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT); - - // Delete the ideal state itself. - PropertyKey isKey = getISPropertyKey(accessor, resourceName); - if (!accessor.removeProperty(isKey)) { - throw new RuntimeException(String.format( - "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix.", - resourceName, isKey)); - } - - // Delete dead external view - // because job is already completed, there is no more current state change - // thus dead external views removal will not be triggered - PropertyKey evKey = accessor.keyBuilder().externalView(resourceName); - accessor.removeProperty(evKey); - - LOG.info(String.format("Successfully cleaned up job resource %s.", resourceName)); - - boolean lastInWorkflow = true; - for (String job : cfg.getJobDag().getAllNodes()) { - // check if property store information or resource configs exist for this job - if (mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(job), - AccessOption.PERSISTENT) - || accessor.getProperty(getConfigPropertyKey(accessor, job)) != null - || accessor.getProperty(getISPropertyKey(accessor, job)) != null) { - lastInWorkflow = false; - break; - } - } - - // clean up workflow-level info if this was the last in workflow - if (lastInWorkflow && (cfg.isTerminable() || cfg.getTargetState() == TargetState.DELETE)) { - // delete workflow config - PropertyKey workflowCfgKey = getConfigPropertyKey(accessor, workflowResource); - if (!accessor.removeProperty(workflowCfgKey)) { - throw new RuntimeException( - String - .format( - "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.", - workflowResource, workflowCfgKey)); - } - // Delete property store information for this workflow - String workflowPropStoreKey = getRebalancerPropStoreKey(workflowResource); - if (!mgr.getHelixPropertyStore().remove(workflowPropStoreKey, AccessOption.PERSISTENT)) { - throw new RuntimeException( - String - .format( - "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.", - workflowResource, workflowPropStoreKey)); - } - // Remove pending timer for this workflow if exists - if (SCHEDULED_TIMES.containsKey(workflowResource)) { - SCHEDULED_TIMES.remove(workflowResource); - } - } - - } - - private static String getRebalancerPropStoreKey(String resource) { - return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource); - } - - private static PropertyKey getISPropertyKey(HelixDataAccessor accessor, String resource) { - return accessor.keyBuilder().idealStates(resource); - } - - private static PropertyKey getConfigPropertyKey(HelixDataAccessor accessor, String resource) { - return accessor.keyBuilder().resourceConfig(resource); - } - - private static void addAllPartitions(Set<Integer> toAdd, Set<Integer> destination) { - for (Integer pId : toAdd) { - destination.add(pId); - } - } - - private static ResourceAssignment emptyAssignment(String name, CurrentStateOutput currStateOutput) { + protected ResourceAssignment buildEmptyAssignment(String name, + CurrentStateOutput currStateOutput) { ResourceAssignment assignment = new ResourceAssignment(name); Set<Partition> partitions = currStateOutput.getCurrentStateMappedPartitions(name); for (Partition partition : partitions) { @@ -861,164 +112,158 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { return assignment; } - private static void addCompletedPartitions(Set<Integer> set, JobContext ctx, - Iterable<Integer> pIds) { - for (Integer pId : pIds) { - TaskPartitionState state = ctx.getPartitionState(pId); - if (state == TaskPartitionState.COMPLETED) { - set.add(pId); - } - } - } - - private static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) { - return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask(); - } + /** + * Check all the dependencies of a job to determine whether the job is ready to be scheduled. + * + * @param job + * @param workflowCfg + * @param workflowCtx + * @return + */ + protected boolean isJobReadyToSchedule(String job, WorkflowConfig workflowCfg, + WorkflowContext workflowCtx) { + int notStartedCount = 0; + int inCompleteCount = 0; - // add all partitions that have been tried maxNumberAttempts - private static void addGiveupPartitions(Set<Integer> set, JobContext ctx, Iterable<Integer> pIds, - JobConfig cfg) { - for (Integer pId : pIds) { - if (isTaskGivenup(ctx, cfg, pId)) { - set.add(pId); + for (String ancestor : workflowCfg.getJobDag().getAncestors(job)) { + TaskState jobState = workflowCtx.getJobState(ancestor); + if (jobState == null || jobState == TaskState.NOT_STARTED) { + ++notStartedCount; + } else if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) { + ++inCompleteCount; } } - } - - private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions, - Set<Integer> excluded, int n) { - List<Integer> result = new ArrayList<Integer>(); - for (Integer pId : candidatePartitions) { - if (result.size() >= n) { - break; - } - if (!excluded.contains(pId)) { - result.add(pId); - } + if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()) { + LOG.debug(String + .format("Job %s is not ready to start, notStartedParent(s)=%d, inCompleteParent(s)=%d.", + job, notStartedCount, inCompleteCount)); + return false; } - return result; - } - - private static void markPartitionDelayed(JobConfig cfg, JobContext ctx, int p) { - long delayInterval = cfg.getTaskRetryDelay(); - if (delayInterval <= 0) { - return; - } - long nextStartTime = ctx.getPartitionFinishTime(p) + delayInterval; - ctx.setNextRetryTime(p, nextStartTime); + return true; } - private static void markPartitionCompleted(JobContext ctx, int pId) { - ctx.setPartitionState(pId, TaskPartitionState.COMPLETED); - ctx.setPartitionFinishTime(pId, System.currentTimeMillis()); - ctx.incrementNumAttempts(pId); + /** + * Check if a workflow is ready to schedule. + * + * @param workflowCfg the workflow to check + * @return true if the workflow is ready for schedule, false if not ready + */ + protected boolean isWorkflowReadyForSchedule(WorkflowConfig workflowCfg) { + Date startTime = workflowCfg.getStartTime(); + // Workflow with non-scheduled config or passed start time is ready to schedule. + return (startTime == null || startTime.getTime() <= System.currentTimeMillis()); } - private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state, - boolean incrementAttempts) { - ctx.setPartitionState(pId, state); - ctx.setPartitionFinishTime(pId, System.currentTimeMillis()); - if (incrementAttempts) { - ctx.incrementNumAttempts(pId); - } + @Override public IdealState computeNewIdealState(String resourceName, + IdealState currentIdealState, CurrentStateOutput currentStateOutput, + ClusterDataCache clusterData) { + // All of the heavy lifting is in the ResourceAssignment computation, + // so this part can just be a no-op. + return currentIdealState; } - private static void markAllPartitionsError(JobContext ctx, TaskPartitionState state, - boolean incrementAttempts) { - for (int pId : ctx.getPartitionSet()) { - markPartitionError(ctx, pId, state, incrementAttempts); - } - } + // Management of already-scheduled rebalances across all task entities. + protected static class ScheduledRebalancer { + private class ScheduledTask { + long _startTime; + Future _future; - /** - * Return the assignment of task partitions per instance. - */ - private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments( - Iterable<String> instanceList, ResourceAssignment assignment, Set<Integer> includeSet) { - Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>(); - for (String instance : instanceList) { - result.put(instance, new TreeSet<Integer>()); - } + public ScheduledTask(long _startTime, Future _future) { + this._startTime = _startTime; + this._future = _future; + } - for (Partition partition : assignment.getMappedPartitions()) { - int pId = pId(partition.getPartitionName()); - if (includeSet.contains(pId)) { - Map<String, String> replicaMap = assignment.getReplicaMap(partition); - for (String instance : replicaMap.keySet()) { - SortedSet<Integer> pList = result.get(instance); - if (pList != null) { - pList.add(pId); - } - } + public long getStartTime() { + return _startTime; } - } - return result; - } - private static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) { - Set<Integer> nonReadyPartitions = Sets.newHashSet(); - for (int p : ctx.getPartitionSet()) { - long toStart = ctx.getNextRetryTime(p); - if (now < toStart) { - nonReadyPartitions.add(p); + public Future getFuture() { + return _future; } } - return nonReadyPartitions; - } - - /** - * Computes the partition name given the resource name and partition id. - */ - protected static String pName(String resource, int pId) { - return resource + "_" + pId; - } - /** - * Extracts the partition id from the given partition name. - */ - protected static int pId(String pName) { - String[] tokens = pName.split("_"); - return Integer.valueOf(tokens[tokens.length - 1]); - } - - /** - * An (instance, state) pair. - */ - private static class PartitionAssignment { - private final String _instance; - private final String _state; + private final Map<String, ScheduledTask> _rebalanceTasks = new HashMap<String, ScheduledTask>(); + private final ScheduledExecutorService _rebalanceExecutor = + Executors.newSingleThreadScheduledExecutor(); + + /** + * Add a future rebalance task for resource at given startTime + * + * @param resource + * @param startTime time in milliseconds + */ + public void scheduleRebalance(HelixManager manager, String resource, long startTime) { + // Do nothing if there is already a timer set for the this workflow with the same start time. + ScheduledTask existTask = _rebalanceTasks.get(resource); + if (existTask != null && existTask.getStartTime() == startTime) { + LOG.debug("Schedule timer for job: " + resource + " is up to date."); + return; + } - private PartitionAssignment(String instance, String state) { - _instance = instance; - _state = state; + long delay = startTime - System.currentTimeMillis(); + LOG.info("Schedule rebalance with job: " + resource + " at time: " + startTime + " delay: " + + delay); + + // For workflow not yet scheduled, schedule them and record it + RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(manager, resource); + ScheduledFuture future = + _rebalanceExecutor.schedule(rebalanceInvoker, delay, TimeUnit.MILLISECONDS); + ScheduledTask prevTask = _rebalanceTasks.put(resource, new ScheduledTask(startTime, future)); + if (prevTask != null && !prevTask.getFuture().isDone()) { + if (!prevTask.getFuture().cancel(false)) { + LOG.warn("Failed to cancel scheduled timer task for " + resource); + } + } } - } - @Override - public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState, - CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) { - // All of the heavy lifting is in the ResourceAssignment computation, - // so this part can just be a no-op. - return currentIdealState; - } + /** + * Get the current schedule time for given resource. + * + * @param resource + * @return existing schedule time or NULL if there is no scheduled task for this resource + */ + public long getRebalanceTime(String resource) { + ScheduledTask task = _rebalanceTasks.get(resource); + if (task != null) { + return task.getStartTime(); + } + return -1; + } + + /** + * Remove all existing future schedule tasks for the given resource + * + * @param resource + */ + public void removeScheduledRebalance(String resource) { + ScheduledTask existTask = _rebalanceTasks.remove(resource); + if (existTask != null && !existTask.getFuture().isDone()) { + if (!existTask.getFuture().cancel(true)) { + LOG.warn("Failed to cancel scheduled timer task for " + resource); + } + LOG.info( + "Remove scheduled rebalance task at time " + existTask.getStartTime() + " for resource: " + + resource); + } + } - /** - * The simplest possible runnable that will trigger a run of the controller pipeline - */ - private static class RebalanceInvoker implements Runnable { - private final HelixManager _manager; - private final String _resource; + /** + * The simplest possible runnable that will trigger a run of the controller pipeline + */ + private class RebalanceInvoker implements Runnable { + private final HelixManager _manager; + private final String _resource; - public RebalanceInvoker(HelixManager manager, String resource) { - _manager = manager; - _resource = resource; - } + public RebalanceInvoker(HelixManager manager, String resource) { + _manager = manager; + _resource = resource; + } - @Override - public void run() { - TaskUtil.invokeRebalance(_manager, _resource); + @Override public void run() { + TaskUtil.invokeRebalance(_manager.getHelixDataAccessor(), _resource); + } } } }
http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index bb62de5..d804fab 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import org.apache.helix.AccessOption; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.HelixProperty; import org.apache.helix.PropertyKey; @@ -56,14 +57,13 @@ import com.google.common.collect.Maps; public class TaskUtil { private static final Logger LOG = Logger.getLogger(TaskUtil.class); public static final String CONTEXT_NODE = "Context"; - public static final String PREV_RA_NODE = "PreviousResourceAssignment"; - /** * Parses job resource configurations in Helix into a {@link JobConfig} object. - * @param accessor Accessor to access Helix configs + * + * @param accessor Accessor to access Helix configs * @param jobResource The name of the job resource * @return A {@link JobConfig} object if Helix contains valid configurations for the job, null - * otherwise. + * otherwise. */ public static JobConfig getJobCfg(HelixDataAccessor accessor, String jobResource) { HelixProperty jobResourceConfig = getResourceConfig(accessor, jobResource); @@ -85,10 +85,11 @@ public class TaskUtil { /** * Parses job resource configurations in Helix into a {@link JobConfig} object. - * @param manager HelixManager object used to connect to Helix. + * + * @param manager HelixManager object used to connect to Helix. * @param jobResource The name of the job resource. * @return A {@link JobConfig} object if Helix contains valid configurations for the job, null - * otherwise. + * otherwise. */ public static JobConfig getJobCfg(HelixManager manager, String jobResource) { return getJobCfg(manager.getHelixDataAccessor(), jobResource); @@ -96,12 +97,13 @@ public class TaskUtil { /** * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object. - * @param cfgAccessor Config accessor to access Helix configs - * @param accessor Accessor to access Helix configs - * @param clusterName Cluster name + * + * @param cfgAccessor Config accessor to access Helix configs + * @param accessor Accessor to access Helix configs + * @param clusterName Cluster name * @param workflowResource The name of the workflow resource. * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the - * workflow, null otherwise. + * workflow, null otherwise. */ public static WorkflowConfig getWorkflowCfg(ConfigAccessor cfgAccessor, HelixDataAccessor accessor, String clusterName, String workflowResource) { @@ -117,10 +119,11 @@ public class TaskUtil { /** * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object. - * @param manager Helix manager object used to connect to Helix. + * + * @param manager Helix manager object used to connect to Helix. * @param workflowResource The name of the workflow resource. * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the - * workflow, null otherwise. + * workflow, null otherwise. */ public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource) { return getWorkflowCfg(manager.getConfigAccessor(), manager.getHelixDataAccessor(), @@ -129,18 +132,19 @@ public class TaskUtil { /** * Request a state change for a specific task. - * @param accessor connected Helix data accessor - * @param instance the instance serving the task + * + * @param accessor connected Helix data accessor + * @param instance the instance serving the task * @param sessionId the current session of the instance - * @param resource the job name + * @param resource the job name * @param partition the task partition name - * @param state the requested state + * @param state the requested state * @return true if the request was persisted, false otherwise */ public static boolean setRequestedState(HelixDataAccessor accessor, String instance, String sessionId, String resource, String partition, TaskPartitionState state) { - LOG.debug(String.format("Requesting a state transition to %s for partition %s.", state, - partition)); + LOG.debug( + String.format("Requesting a state transition to %s for partition %s.", state, partition)); try { PropertyKey.Builder keyBuilder = accessor.keyBuilder(); PropertyKey key = keyBuilder.currentState(instance, sessionId, resource); @@ -149,16 +153,18 @@ public class TaskUtil { return accessor.updateProperty(key, currStateDelta); } catch (Exception e) { - LOG.error(String.format("Error when requesting a state transition to %s for partition %s.", - state, partition), e); + LOG.error(String + .format("Error when requesting a state transition to %s for partition %s.", state, + partition), e); return false; } } /** * Get a Helix configuration scope at a resource (i.e. job and workflow) level + * * @param clusterName the cluster containing the resource - * @param resource the resource name + * @param resource the resource name * @return instantiated {@link HelixConfigScope} */ public static HelixConfigScope getResourceConfigScope(String clusterName, String resource) { @@ -167,51 +173,24 @@ public class TaskUtil { } /** - * Get the last task assignment for a given job - * @param manager a connection to Helix - * @param resourceName the name of the job - * @return {@link ResourceAssignment} instance, or null if no assignment is available - */ - public static ResourceAssignment getPrevResourceAssignment(HelixManager manager, - String resourceName) { - ZNRecord r = - manager.getHelixPropertyStore().get( - Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE), - null, AccessOption.PERSISTENT); - return r != null ? new ResourceAssignment(r) : null; - } - - /** - * Set the last task assignment for a given job - * @param manager a connection to Helix - * @param resourceName the name of the job - * @param ra {@link ResourceAssignment} containing the task assignment - */ - public static void setPrevResourceAssignment(HelixManager manager, String resourceName, - ResourceAssignment ra) { - manager.getHelixPropertyStore().set( - Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE), - ra.getRecord(), AccessOption.PERSISTENT); - } - - /** * Get the runtime context of a single job + * * @param propertyStore Property store for the cluster - * @param jobResource The name of the job + * @param jobResource The name of the job * @return the {@link JobContext}, or null if none is available */ public static JobContext getJobContext(HelixPropertyStore<ZNRecord> propertyStore, String jobResource) { - ZNRecord r = - propertyStore.get( - Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE), + ZNRecord r = propertyStore + .get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE), null, AccessOption.PERSISTENT); return r != null ? new JobContext(r) : null; } /** * Get the runtime context of a single job - * @param manager a connection to Helix + * + * @param manager a connection to Helix * @param jobResource the name of the job * @return the {@link JobContext}, or null if none is available */ @@ -221,34 +200,36 @@ public class TaskUtil { /** * Set the runtime context of a single job - * @param manager a connection to Helix + * + * @param manager a connection to Helix * @param jobResource the name of the job - * @param ctx the up-to-date {@link JobContext} for the job + * @param ctx the up-to-date {@link JobContext} for the job */ public static void setJobContext(HelixManager manager, String jobResource, JobContext ctx) { - manager.getHelixPropertyStore().set( - Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE), - ctx.getRecord(), AccessOption.PERSISTENT); + manager.getHelixPropertyStore() + .set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE), + ctx.getRecord(), AccessOption.PERSISTENT); } /** * Get the runtime context of a single workflow - * @param propertyStore Property store of the cluster + * + * @param propertyStore Property store of the cluster * @param workflowResource The name of the workflow * @return the {@link WorkflowContext}, or null if none is available */ public static WorkflowContext getWorkflowContext(HelixPropertyStore<ZNRecord> propertyStore, String workflowResource) { - ZNRecord r = - propertyStore.get( - Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource, - CONTEXT_NODE), null, AccessOption.PERSISTENT); + ZNRecord r = propertyStore.get( + Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource, CONTEXT_NODE), + null, AccessOption.PERSISTENT); return r != null ? new WorkflowContext(r) : null; } /** * Get the runtime context of a single workflow - * @param manager a connection to Helix + * + * @param manager a connection to Helix * @param workflowResource the name of the workflow * @return the {@link WorkflowContext}, or null if none is available */ @@ -258,9 +239,10 @@ public class TaskUtil { /** * Set the runtime context of a single workflow - * @param manager a connection to Helix + * + * @param manager a connection to Helix * @param workflowResource the name of the workflow - * @param ctx the up-to-date {@link WorkflowContext} for the workflow + * @param ctx the up-to-date {@link WorkflowContext} for the workflow */ public static void setWorkflowContext(HelixManager manager, String workflowResource, WorkflowContext ctx) { @@ -271,6 +253,7 @@ public class TaskUtil { /** * Get a workflow-qualified job name for a single-job workflow + * * @param singleJobWorkflow the name of the single-job workflow * @return The namespaced job name, which is just singleJobWorkflow_singleJobWorkflow */ @@ -280,8 +263,9 @@ public class TaskUtil { /** * Get a workflow-qualified job name for a job in that workflow + * * @param workflowResource the name of the workflow - * @param jobName the un-namespaced name of the job + * @param jobName the un-namespaced name of the job * @return The namespaced job name, which is just workflowResource_jobName */ public static String getNamespacedJobName(String workflowResource, String jobName) { @@ -290,8 +274,9 @@ public class TaskUtil { /** * Remove the workflow namespace from the job name + * * @param workflowResource the name of the workflow that owns the job - * @param jobName the namespaced job name + * @param jobName the namespaced job name * @return the denamespaced job name, or the same job name if it is already denamespaced */ public static String getDenamespacedJobName(String workflowResource, String jobName) { @@ -305,6 +290,7 @@ public class TaskUtil { /** * Serialize a map of job-level configurations as a single string + * * @param commandConfig map of job config key to config value * @return serialized string */ @@ -321,6 +307,7 @@ public class TaskUtil { /** * Deserialize a single string into a map of job-level configurations + * * @param commandConfig the serialized job config map * @return a map of job config key to config value */ @@ -339,22 +326,27 @@ public class TaskUtil { /** * Trigger a controller pipeline execution for a given resource. - * @param manager Helix connection + * + * @param accessor Helix data accessor * @param resource the name of the resource changed to triggering the execution */ - public static void invokeRebalance(HelixManager manager, String resource) { + public static void invokeRebalance(HelixDataAccessor accessor, String resource) { // The pipeline is idempotent, so touching an ideal state is enough to trigger a pipeline run - HelixDataAccessor accessor = manager.getHelixDataAccessor(); + LOG.info("invoke rebalance for " + resource); PropertyKey key = accessor.keyBuilder().idealStates(resource); IdealState is = accessor.getProperty(key); - if (is != null) { - accessor.updateProperty(key, is); - LOG.debug("invoke rebalance for " + key); + if (is != null && is.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) { + if (!accessor.updateProperty(key, is)) { + LOG.warn("Failed to invoke rebalance on resource " + resource); + } + } else { + LOG.warn("Can't find ideal state or ideal state is not for right type for " + resource); } } /** * Get a ScheduleConfig from a workflow config string map + * * @param cfg the string map * @return a ScheduleConfig if one exists, otherwise null */ @@ -369,11 +361,11 @@ public class TaskUtil { return null; } } - if (cfg.containsKey(WorkflowConfig.RECURRENCE_UNIT) - && cfg.containsKey(WorkflowConfig.RECURRENCE_INTERVAL)) { - return ScheduleConfig.recurringFromDate(startTime, - TimeUnit.valueOf(cfg.get(WorkflowConfig.RECURRENCE_UNIT)), - Long.parseLong(cfg.get(WorkflowConfig.RECURRENCE_INTERVAL))); + if (cfg.containsKey(WorkflowConfig.RECURRENCE_UNIT) && cfg + .containsKey(WorkflowConfig.RECURRENCE_INTERVAL)) { + return ScheduleConfig + .recurringFromDate(startTime, TimeUnit.valueOf(cfg.get(WorkflowConfig.RECURRENCE_UNIT)), + Long.parseLong(cfg.get(WorkflowConfig.RECURRENCE_INTERVAL))); } else if (startTime != null) { return ScheduleConfig.oneTimeDelayedStart(startTime); } @@ -382,10 +374,11 @@ public class TaskUtil { /** * Create a new workflow based on an existing one - * @param manager connection to Helix + * + * @param manager connection to Helix * @param origWorkflowName the name of the existing workflow - * @param newWorkflowName the name of the new workflow - * @param newStartTime a provided start time that deviates from the desired start time + * @param newWorkflowName the name of the new workflow + * @param newStartTime a provided start time that deviates from the desired start time * @return the cloned workflow, or null if there was a problem cloning the existing one */ public static Workflow cloneWorkflow(HelixManager manager, String origWorkflowName, @@ -474,4 +467,61 @@ public class TaskUtil { PropertyKey.Builder keyBuilder = accessor.keyBuilder(); return accessor.getProperty(keyBuilder.resourceConfig(resource)); } + + /** + * Cleans up IdealState and external view associated with a job/workflow resource. + */ + public static void cleanupIdealStateExtView(HelixDataAccessor accessor, final String resourceName) { + LOG.info("Cleaning up idealstate and externalView for job: " + resourceName); + + // Delete the ideal state itself. + PropertyKey isKey = accessor.keyBuilder().idealStates(resourceName); + if (accessor.getProperty(isKey) != null) { + if (!accessor.removeProperty(isKey)) { + LOG.error(String.format( + "Error occurred while trying to clean up resource %s. Failed to remove node %s from Helix.", + resourceName, isKey)); + } + } else { + LOG.warn(String.format("Idealstate for resource %s does not exist.", resourceName)); + } + + // Delete dead external view + // because job is already completed, there is no more current state change + // thus dead external views removal will not be triggered + PropertyKey evKey = accessor.keyBuilder().externalView(resourceName); + if (accessor.getProperty(evKey) != null) { + if (!accessor.removeProperty(evKey)) { + LOG.error(String.format( + "Error occurred while trying to clean up resource %s. Failed to remove node %s from Helix.", + resourceName, evKey)); + } + } + + LOG.info(String + .format("Successfully clean up idealstate/externalView for resource %s.", resourceName)); + } + + /** + * Extracts the partition id from the given partition name. + * + * @param pName + * @return + */ + public static int getPartitionId(String pName) { + int index = pName.lastIndexOf("_"); + if (index == -1) { + throw new HelixException("Invalid partition name " + pName); + } + return Integer.valueOf(pName.substring(index + 1)); + } + + public static String getWorkflowContextKey(String resource) { + // TODO: fix this to use the keyBuilder. + return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource); + } + + public static PropertyKey getWorkflowConfigKey(HelixDataAccessor accessor, String resource) { + return accessor.keyBuilder().resourceConfig(resource); + } } http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/Workflow.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java index 259b72c..8ea2691 100644 --- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java +++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java @@ -230,7 +230,7 @@ public class Workflow { protected Map<String, Map<String, String>> _jobConfigs; protected Map<String, List<TaskConfig>> _taskConfigs; protected ScheduleConfig _scheduleConfig; - protected long _expiry; + protected long _expiry = -1; protected Map<String, String> _cfgMap; protected int _parallelJobs = -1; @@ -239,7 +239,7 @@ public class Workflow { _dag = new JobDag(); _jobConfigs = new TreeMap<String, Map<String, String>>(); _taskConfigs = new TreeMap<String, List<TaskConfig>>(); - _expiry = WorkflowConfig.DEFAULT_EXPIRY; + _expiry = -1; } public Builder addConfig(String job, String key, String val) { @@ -340,7 +340,7 @@ public class Workflow { if (_expiry > 0) { builder.setExpiry(_expiry); } - if (_parallelJobs != -1) { + if (_parallelJobs > 0) { builder.setParallelJobs(_parallelJobs); } http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java new file mode 100644 index 0000000..912f501 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java @@ -0,0 +1,412 @@ +package org.apache.helix.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.I0Itec.zkclient.DataUpdater; +import org.apache.helix.*; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.*; +import org.apache.helix.model.builder.CustomModeISBuilder; +import org.apache.log4j.Logger; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.*; + +/** + * Custom rebalancer implementation for the {@code Workflow} in task state model. + */ +public class WorkflowRebalancer extends TaskRebalancer { + private static final Logger LOG = Logger.getLogger(WorkflowRebalancer.class); + + @Override + public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData, + IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) { + final String workflow = resource.getResourceName(); + LOG.debug("Computer Best Partition for workflow: " + workflow); + + // Fetch workflow configuration and context + WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflow); + if (workflowCfg == null) { + LOG.warn("Workflow configuration is NULL for " + workflow); + return buildEmptyAssignment(workflow, currStateOutput); + } + + WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflow); + // Initialize workflow context if needed + if (workflowCtx == null) { + workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext")); + workflowCtx.setStartTime(System.currentTimeMillis()); + LOG.debug("Workflow context is created for " + workflow); + } + + // Clean up if workflow marked for deletion + TargetState targetState = workflowCfg.getTargetState(); + if (targetState == TargetState.DELETE) { + LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context."); + cleanupWorkflow(workflow, workflowCfg, workflowCtx); + return buildEmptyAssignment(workflow, currStateOutput); + } + + if (targetState == TargetState.STOP) { + LOG.info("Workflow " + workflow + "is marked as stopped."); + // Workflow has been stopped if all jobs are stopped + // TODO: what should we do if workflowCtx is not set yet? + if (workflowCtx != null && isWorkflowStopped(workflowCtx, workflowCfg)) { + workflowCtx.setWorkflowState(TaskState.STOPPED); + } + return buildEmptyAssignment(workflow, currStateOutput); + } + + long currentTime = System.currentTimeMillis(); + // Check if workflow is completed and mark it if it is completed. + if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) { + if (isWorkflowComplete(workflowCtx, workflowCfg)) { + workflowCtx.setWorkflowState(TaskState.COMPLETED); + workflowCtx.setFinishTime(currentTime); + TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx); + } + } + + if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) { + LOG.info("Workflow " + workflow + " is completed."); + long expiryTime = workflowCfg.getExpiry(); + // Check if this workflow has been finished past its expiry. + if (workflowCtx.getFinishTime() + expiryTime <= currentTime) { + LOG.info("Workflow " + workflow + " passed expiry time, cleaning up the workflow context."); + cleanupWorkflow(workflow, workflowCfg, workflowCtx); + } else { + // schedule future cleanup work + long cleanupTime = workflowCtx.getFinishTime() + expiryTime; + _scheduledRebalancer.scheduleRebalance(_manager, workflow, cleanupTime); + } + return buildEmptyAssignment(workflow, currStateOutput); + } + + if (!isWorkflowReadyForSchedule(workflowCfg)) { + LOG.info("Workflow " + workflow + " is not ready to schedule"); + // set the timer to trigger future schedule + _scheduledRebalancer + .scheduleRebalance(_manager, workflow, workflowCfg.getStartTime().getTime()); + return buildEmptyAssignment(workflow, currStateOutput); + } + + // Check for readiness, and stop processing if it's not ready + boolean isReady = + scheduleWorkflowIfReady(workflow, workflowCfg, workflowCtx); + if (isReady) { + // Schedule jobs from this workflow. + scheduleJobs(workflowCfg, workflowCtx); + } else { + LOG.debug("Workflow " + workflow + " is not ready to be scheduled."); + } + + TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx); + return buildEmptyAssignment(workflow, currStateOutput); + } + + /** + * Figure out whether the jobs in the workflow should be run, + * and if it's ready, then just schedule it + */ + private void scheduleJobs(WorkflowConfig workflowCfg, WorkflowContext workflowCtx) { + ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig(); + if (scheduleConfig != null && scheduleConfig.isRecurring()) { + LOG.debug("Jobs from recurring workflow are not schedule-able"); + return; + } + + for (String job : workflowCfg.getJobDag().getAllNodes()) { + TaskState jobState = workflowCtx.getJobState(job); + if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) { + LOG.debug("Job " + job + " is already started or completed."); + continue; + } + // check ancestor job status + if (isJobReadyToSchedule(job, workflowCfg, workflowCtx)) { + JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job); + scheduleSingleJob(job, jobConfig); + } + } + } + + /** + * Posts new job to cluster + */ + private void scheduleSingleJob(String jobResource, JobConfig jobConfig) { + HelixAdmin admin = _manager.getClusterManagmentTool(); + + IdealState jobIS = admin.getResourceIdealState(_manager.getClusterName(), jobResource); + if (jobIS != null) { + LOG.info("Job " + jobResource + " idealstate already exists!"); + return; + } + + // Set up job resource based on partitions from target resource + int numIndependentTasks = jobConfig.getTaskConfigMap().size(); + int numPartitions = (numIndependentTasks > 0) ? + numIndependentTasks : + admin.getResourceIdealState(_manager.getClusterName(), jobConfig.getTargetResource()) + .getPartitionSet().size(); + admin.addResource(_manager.getClusterName(), jobResource, numPartitions, + TaskConstants.STATE_MODEL_NAME); + + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); + + // Set the job configuration + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + HelixProperty resourceConfig = new HelixProperty(jobResource); + resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap()); + Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap(); + if (taskConfigMap != null) { + for (TaskConfig taskConfig : taskConfigMap.values()) { + resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap()); + } + } + accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig); + + // Push out new ideal state based on number of target partitions + CustomModeISBuilder builder = new CustomModeISBuilder(jobResource); + builder.setRebalancerMode(IdealState.RebalanceMode.TASK); + builder.setNumReplica(1); + builder.setNumPartitions(numPartitions); + builder.setStateModel(TaskConstants.STATE_MODEL_NAME); + + if (jobConfig.isDisableExternalView()) { + builder.setDisableExternalView(true); + } + + jobIS = builder.build(); + for (int i = 0; i < numPartitions; i++) { + jobIS.getRecord().setListField(jobResource + "_" + i, new ArrayList<String>()); + jobIS.getRecord().setMapField(jobResource + "_" + i, new HashMap<String, String>()); + } + jobIS.setRebalancerClassName(JobRebalancer.class.getName()); + admin.setResourceIdealState(_manager.getClusterName(), jobResource, jobIS); + } + + /** + * Check if a workflow is ready to schedule, and schedule a rebalance if it is not + * + * @param workflow the Helix resource associated with the workflow + * @param workflowCfg the workflow to check + * @param workflowCtx the current workflow context + * @return true if the workflow is ready for schedule, false if not ready + */ + private boolean scheduleWorkflowIfReady(String workflow, WorkflowConfig workflowCfg, + WorkflowContext workflowCtx) { + // non-scheduled workflow is ready to run immediately. + if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) { + return true; + } + + // Figure out when this should be run, and if it's ready, then just run it + ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig(); + Date startTime = scheduleConfig.getStartTime(); + long currentTime = new Date().getTime(); + long delayFromStart = startTime.getTime() - currentTime; + + if (delayFromStart <= 0) { + // Recurring workflows are just templates that spawn new workflows + if (scheduleConfig.isRecurring()) { + // Skip scheduling this workflow if it's not in a start state + if (!workflowCfg.getTargetState().equals(TargetState.START)) { + LOG.debug("Skip scheduling since the workflow has not been started " + workflow); + return false; + } + + // Skip scheduling this workflow again if the previous run (if any) is still active + String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow(); + if (lastScheduled != null) { + WorkflowContext lastWorkflowCtx = TaskUtil.getWorkflowContext(_manager, lastScheduled); + if (lastWorkflowCtx != null + && lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) { + LOG.info("Skip scheduling since last schedule has not completed yet " + lastScheduled); + return false; + } + } + + // Figure out how many jumps are needed, thus the time to schedule the next workflow + // The negative of the delay is the amount of time past the start time + long period = + scheduleConfig.getRecurrenceUnit().toMillis(scheduleConfig.getRecurrenceInterval()); + long offsetMultiplier = (-delayFromStart) / period; + long timeToSchedule = period * offsetMultiplier + startTime.getTime(); + + // Now clone the workflow if this clone has not yet been created + DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss"); + df.setTimeZone(TimeZone.getTimeZone("UTC")); + String newWorkflowName = workflow + "_" + df.format(new Date(timeToSchedule)); + LOG.debug("Ready to start workflow " + newWorkflowName); + if (!newWorkflowName.equals(lastScheduled)) { + Workflow clonedWf = TaskUtil + .cloneWorkflow(_manager, workflow, newWorkflowName, new Date(timeToSchedule)); + TaskDriver driver = new TaskDriver(_manager); + try { + // Start the cloned workflow + driver.start(clonedWf); + } catch (Exception e) { + LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e); + } + // Persist workflow start regardless of success to avoid retrying and failing + workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName); + TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx); + } + + // Change the time to trigger the pipeline to that of the next run + _scheduledRebalancer.scheduleRebalance(_manager, workflow, (timeToSchedule + period)); + } else { + // one time workflow. + // Remove any timers that are past-time for this workflowg + long scheduledTime = _scheduledRebalancer.getRebalanceTime(workflow); + if (scheduledTime > 0 && currentTime > scheduledTime) { + _scheduledRebalancer.removeScheduledRebalance(workflow); + } + return true; + } + } else { + // set the timer to trigger future schedule + _scheduledRebalancer.scheduleRebalance(_manager, workflow, startTime.getTime()); + } + + return false; + } + + /** + * Cleans up workflow configs and workflow contexts associated with this workflow, + * including all job-level configs and context, plus workflow-level information. + */ + private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg, + WorkflowContext workflowCtx) { + LOG.info("Cleaning up workflow: " + workflow); + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); + + /* + if (workflowCtx != null && workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) { + LOG.error("Workflow " + workflow + " has not completed, abort the clean up task."); + return; + }*/ + + for (String job : workflowcfg.getJobDag().getAllNodes()) { + cleanupJob(job, workflow); + } + + // clean up workflow-level info if this was the last in workflow + if (workflowcfg.isTerminable() || workflowcfg.getTargetState() == TargetState.DELETE) { + // clean up IS & EV + TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), workflow); + + // delete workflow config + PropertyKey workflowCfgKey = TaskUtil.getWorkflowConfigKey(accessor, workflow); + if (accessor.getProperty(workflowCfgKey) != null) { + if (!accessor.removeProperty(workflowCfgKey)) { + LOG.error(String.format( + "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix.", + workflow, workflowCfgKey)); + } + } + // Delete workflow context + String workflowPropStoreKey = TaskUtil.getWorkflowContextKey(workflow); + LOG.info("Removing workflow context: " + workflowPropStoreKey); + if (!_manager.getHelixPropertyStore().remove(workflowPropStoreKey, AccessOption.PERSISTENT)) { + LOG.error(String.format( + "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.", + workflow, workflowPropStoreKey)); + } + + // Remove pending timer task for this workflow if exists + _scheduledRebalancer.removeScheduledRebalance(workflow); + } + } + + + /** + * Cleans up workflow configs and workflow contexts associated with this workflow, + * including all job-level configs and context, plus workflow-level information. + */ + private void cleanupJob(final String job, String workflow) { + LOG.info("Cleaning up job: " + job + " in workflow: " + workflow); + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); + + // Remove any idealstate and externalView. + TaskUtil.cleanupIdealStateExtView(accessor, job); + + // Remove DAG references in workflow + PropertyKey workflowKey = TaskUtil.getWorkflowConfigKey(accessor, workflow); + DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() { + @Override + public ZNRecord update(ZNRecord currentData) { + if (currentData != null) { + JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG)); + for (String child : jobDag.getDirectChildren(job)) { + jobDag.getChildrenToParents().get(child).remove(job); + } + for (String parent : jobDag.getDirectParents(job)) { + jobDag.getParentsToChildren().get(parent).remove(job); + } + jobDag.getChildrenToParents().remove(job); + jobDag.getParentsToChildren().remove(job); + jobDag.getAllNodes().remove(job); + try { + currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson()); + } catch (Exception e) { + LOG.error("Could not update DAG for job: " + job, e); + } + } else { + LOG.error("Could not update DAG for job: " + job + " ZNRecord is null."); + } + return currentData; + } + }; + accessor.getBaseDataAccessor().update(workflowKey.getPath(), dagRemover, + AccessOption.PERSISTENT); + + // Delete job configs. + PropertyKey cfgKey = TaskUtil.getWorkflowConfigKey(accessor, job); + if (accessor.getProperty(cfgKey) != null) { + if (!accessor.removeProperty(cfgKey)) { + LOG.error(String.format( + "Error occurred while trying to clean up job %s. Failed to remove node %s from Helix.", + job, cfgKey)); + } + } + + // Delete job context + // For recurring workflow, it's OK if the node doesn't exist. + String propStoreKey = TaskUtil.getWorkflowContextKey(job); + if (!_manager.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT)) { + LOG.warn(String.format( + "Error occurred while trying to clean up job %s. Failed to remove node %s from Helix.", + job, propStoreKey)); + } + + LOG.info(String.format("Successfully cleaned up job context %s.", job)); + + _scheduledRebalancer.removeScheduledRebalance(job); + } + + @Override + public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState, + CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) { + // Nothing to do here with workflow resource. + return currentIdealState; + } +}
