Replace BestPossibleStage with TaskSchedulingStage For new task framework, we dont have to loop through all the idealstates. For the first step, we can have a simple version of BestPossibleStage.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/50c9aa3a Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/50c9aa3a Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/50c9aa3a Branch: refs/heads/master Commit: 50c9aa3a2eaf82e75743c693b7e40ad741e51198 Parents: c45d3a6 Author: Junkai Xue <[email protected]> Authored: Tue Jul 17 17:56:36 2018 -0700 Committer: Lei Xia <[email protected]> Committed: Mon Sep 17 15:45:45 2018 -0700 ---------------------------------------------------------------------- .../controller/GenericHelixController.java | 2 +- .../stages/BestPossibleStateCalcStage.java | 57 +----- .../controller/stages/TaskSchedulingStage.java | 204 +++++++++++++++++++ .../task/TestWorkflowTermination.java | 26 +-- 4 files changed, 227 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/50c9aa3a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 9f94755..48677d3 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -306,7 +306,7 @@ public class GenericHelixController implements IdealStateChangeListener, // TODO: Junkai will work on refactoring existing pipeline log into abstract logic and // extend the logic to separate pipeline Pipeline rebalancePipeline = new Pipeline(pipelineName); - rebalancePipeline.addStage(new BestPossibleStateCalcStage()); + rebalancePipeline.addStage(new TaskSchedulingStage()); rebalancePipeline.addStage(new IntermediateStateCalcStage()); rebalancePipeline.addStage(new MessageGenerationPhase()); rebalancePipeline.addStage(new MessageSelectionStage()); http://git-wip-us.apache.org/repos/asf/helix/blob/50c9aa3a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index a6ba5b8..5cc9593 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -103,35 +103,25 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); BestPossibleStateOutput output = new BestPossibleStateOutput(); - PriorityQueue<ResourcePriority> resourcePriorityQueue = new PriorityQueue<>(); - TaskDriver taskDriver = null; HelixManager helixManager = event.getAttribute(AttributeName.helixmanager.name()); - if (helixManager != null) { - taskDriver = new TaskDriver(helixManager); - } - for (Resource resource : resourceMap.values()) { - resourcePriorityQueue.add(new ResourcePriority(resource, cache.getIdealState(resource.getResourceName()), - taskDriver)); - } final List<String> failureResources = new ArrayList<>(); - Iterator<ResourcePriority> itr = resourcePriorityQueue.iterator(); + Iterator<Resource> itr = resourceMap.values().iterator(); while (itr.hasNext()) { - Resource resource = itr.next().getResource(); + Resource resource = itr.next(); if (!computeResourceBestPossibleState(event, cache, currentStateOutput, resource, output)) { failureResources.add(resource.getResourceName()); - LogUtil.logWarn(logger, _eventId, "Failed to calculate best possible states for " + resource.getResourceName()); + LogUtil.logWarn(logger, _eventId, + "Failed to calculate best possible states for " + resource.getResourceName()); } } // Check and report if resource rebalance has failure - if (!cache.isTaskCache()) { - ClusterStatusMonitor clusterStatusMonitor = - event.getAttribute(AttributeName.clusterStatusMonitor.name()); - updateRebalanceStatus(!failureResources.isEmpty(), helixManager, cache, clusterStatusMonitor, - "Failed to calculate best possible states for " + failureResources.size() - + " resources."); - } + ClusterStatusMonitor clusterStatusMonitor = + event.getAttribute(AttributeName.clusterStatusMonitor.name()); + updateRebalanceStatus(!failureResources.isEmpty(), helixManager, cache, clusterStatusMonitor, + "Failed to calculate best possible states for " + failureResources.size() + " resources."); + return output; } @@ -356,33 +346,4 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { return mappingCalculator; } - - class ResourcePriority implements Comparable<ResourcePriority> { - final Resource _resource; - // By default, non-job resources and new jobs are assigned lowest priority - Long _priority = Long.MAX_VALUE; - - Resource getResource() { - return _resource; - } - - public ResourcePriority(Resource resource, IdealState idealState, TaskDriver taskDriver) { - _resource = resource; - - if (taskDriver != null && idealState != null - && idealState.getRebalancerClassName() != null - && idealState.getRebalancerClassName().equals(JobRebalancer.class.getName())) { - // Update priority for job resources, note that older jobs will be processed earlier - JobContext jobContext = taskDriver.getJobContext(resource.getResourceName()); - if (jobContext != null && jobContext.getStartTime() != WorkflowContext.UNSTARTED) { - _priority = jobContext.getStartTime(); - } - } - } - - @Override - public int compareTo(ResourcePriority otherJob) { - return _priority.compareTo(otherJob._priority); - } - } } http://git-wip-us.apache.org/repos/asf/helix/blob/50c9aa3a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskSchedulingStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskSchedulingStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskSchedulingStage.java new file mode 100644 index 0000000..e2596b1 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskSchedulingStage.java @@ -0,0 +1,204 @@ +package org.apache.helix.controller.stages; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import org.apache.helix.HelixManager; +import org.apache.helix.controller.LogUtil; +import org.apache.helix.controller.pipeline.AbstractBaseStage; +import org.apache.helix.controller.pipeline.StageException; +import org.apache.helix.controller.rebalancer.Rebalancer; +import org.apache.helix.controller.rebalancer.internal.MappingCalculator; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.Partition; +import org.apache.helix.model.Resource; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; +import org.apache.helix.task.JobContext; +import org.apache.helix.task.JobRebalancer; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskRebalancer; +import org.apache.helix.task.WorkflowContext; +import org.apache.helix.util.HelixUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TaskSchedulingStage extends AbstractBaseStage { + private static final Logger logger = LoggerFactory.getLogger(TaskSchedulingStage.class.getName()); + + @Override + public void process(ClusterEvent event) throws Exception { + _eventId = event.getEventId(); + CurrentStateOutput currentStateOutput = + event.getAttribute(AttributeName.CURRENT_STATE.name()); + final Map<String, Resource> resourceMap = + event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name()); + ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); + + if (currentStateOutput == null || resourceMap == null || cache == null) { + throw new StageException( + "Missing attributes in event:" + event + ". Requires CURRENT_STATE|RESOURCES|DataCache"); + } + + // Reset current INIT/RUNNING tasks on participants for throttling + cache.resetActiveTaskCount(currentStateOutput); + + final BestPossibleStateOutput bestPossibleStateOutput = + compute(event, resourceMap, currentStateOutput); + event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput); + + } + + private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap, + CurrentStateOutput currentStateOutput) { + ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); + BestPossibleStateOutput output = new BestPossibleStateOutput(); + + PriorityQueue<TaskSchedulingStage.ResourcePriority> resourcePriorityQueue = + new PriorityQueue<>(); + TaskDriver taskDriver = null; + HelixManager helixManager = event.getAttribute(AttributeName.helixmanager.name()); + if (helixManager != null) { + taskDriver = new TaskDriver(helixManager); + } + for (Resource resource : resourceMap.values()) { + resourcePriorityQueue.add(new TaskSchedulingStage.ResourcePriority(resource, + cache.getIdealState(resource.getResourceName()), taskDriver)); + } + + // TODO: Replace this looping available resources with Workflow Queues + for (Iterator<TaskSchedulingStage.ResourcePriority> itr = resourcePriorityQueue.iterator(); + itr.hasNext(); ) { + Resource resource = itr.next().getResource(); + if (!computeResourceBestPossibleState(event, cache, currentStateOutput, resource, output)) { + LogUtil + .logWarn(logger, _eventId, "Failed to assign tasks for " + resource.getResourceName()); + } + } + + return output; + } + + + private boolean computeResourceBestPossibleState(ClusterEvent event, ClusterDataCache cache, + CurrentStateOutput currentStateOutput, Resource resource, BestPossibleStateOutput output) { + // for each ideal state + // read the state model def + // for each resource + // get the preference list + // for each instanceName check if its alive then assign a state + + String resourceName = resource.getResourceName(); + LogUtil.logDebug(logger, _eventId, "Processing resource:" + resourceName); + // Ideal state may be gone. In that case we need to get the state model name + // from the current state + IdealState idealState = cache.getIdealState(resourceName); + if (idealState == null) { + // if ideal state is deleted, use an empty one + LogUtil.logInfo(logger, _eventId, "resource:" + resourceName + " does not exist anymore"); + idealState = new IdealState(resourceName); + idealState.setStateModelDefRef(resource.getStateModelDefRef()); + } + + Rebalancer rebalancer = null; + String rebalancerClassName = idealState.getRebalancerClassName(); + if (rebalancerClassName != null) { + if (logger.isDebugEnabled()) { + LogUtil.logDebug(logger, _eventId, + "resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName); + } + try { + rebalancer = Rebalancer.class + .cast(HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance()); + } catch (Exception e) { + LogUtil.logError(logger, _eventId, + "Exception while invoking custom rebalancer class:" + rebalancerClassName, e); + } + } + + MappingCalculator mappingCalculator = null; + if (rebalancer != null) { + try { + mappingCalculator = MappingCalculator.class.cast(rebalancer); + } catch (ClassCastException e) { + LogUtil.logWarn(logger, _eventId, + "Rebalancer does not have a mapping calculator, defaulting to SEMI_AUTO, resource: " + + resourceName); + } + } else { + return false; + } + + TaskRebalancer taskRebalancer = TaskRebalancer.class.cast(rebalancer); + taskRebalancer.setClusterStatusMonitor( + (ClusterStatusMonitor) event.getAttribute(AttributeName.clusterStatusMonitor.name())); + + ResourceAssignment partitionStateAssignment = null; + try { + HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); + rebalancer.init(manager); + + // Use the internal MappingCalculator interface to compute the final assignment + // The next release will support rebalancers that compute the mapping from start to finish + partitionStateAssignment = mappingCalculator + .computeBestPossiblePartitionState(cache, idealState, resource, currentStateOutput); + for (Partition partition : resource.getPartitions()) { + Map<String, String> newStateMap = partitionStateAssignment.getReplicaMap(partition); + output.setState(resourceName, partition, newStateMap); + } + + // Check if calculation is done successfully + return true; + } catch (Exception e) { + LogUtil + .logError(logger, _eventId, "Error computing assignment for resource " + resourceName + ". Skipping.", e); + // TODO : remove this part after debugging NPE + StringBuilder sb = new StringBuilder(); + + sb.append(String + .format("HelixManager is null : %s\n", event.getAttribute("helixmanager") == null)); + sb.append(String.format("Rebalancer is null : %s\n", rebalancer == null)); + sb.append(String.format("Calculated idealState is null : %s\n", idealState == null)); + sb.append(String.format("MappingCaculator is null : %s\n", mappingCalculator == null)); + sb.append( + String.format("PartitionAssignment is null : %s\n", partitionStateAssignment == null)); + sb.append(String.format("Output is null : %s\n", output == null)); + + LogUtil.logError(logger, _eventId, sb.toString()); + } + + // Exception or rebalancer is not found + return false; + } + + class ResourcePriority implements Comparable<ResourcePriority> { + final Resource _resource; + // By default, non-job resources and new jobs are assigned lowest priority + Long _priority = Long.MAX_VALUE; + + Resource getResource() { + return _resource; + } + + public ResourcePriority(Resource resource, IdealState idealState, TaskDriver taskDriver) { + _resource = resource; + + if (taskDriver != null && idealState != null + && idealState.getRebalancerClassName() != null + && idealState.getRebalancerClassName().equals(JobRebalancer.class.getName())) { + // Update priority for job resources, note that older jobs will be processed earlier + JobContext jobContext = taskDriver.getJobContext(resource.getResourceName()); + if (jobContext != null && jobContext.getStartTime() != WorkflowContext.UNSTARTED) { + _priority = jobContext.getStartTime(); + } + } + } + + @Override + public int compareTo(ResourcePriority otherJob) { + return _priority.compareTo(otherJob._priority); + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/50c9aa3a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java index f303c52..1a08468 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java @@ -130,10 +130,10 @@ public class TestWorkflowTermination extends TaskTestBase { public void testWorkflowPausedTimeout() throws InterruptedException { String workflowName = TestHelper.getTestMethodName(); long workflowExpiry = 2000; // 2sec expiry time - long timeout = 2000; + long timeout = 5000; String notStartedJobName = JOB_NAME + "-NotStarted"; - JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 100); + JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 5000); jobBuilder.setWorkflow(workflowName); Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName) .setWorkflowConfig( @@ -151,7 +151,7 @@ public class TestWorkflowTermination extends TaskTestBase { // Wait a bit for the job to get scheduled. Job runs for 100ms so this will very likely // to trigger a job stopped - Thread.sleep(40); + Thread.sleep(100); // Pause the queue _driver.waitToStop(workflowName, 10000L); @@ -208,17 +208,17 @@ public class TestWorkflowTermination extends TaskTestBase { String job2 = JOB_NAME + "2"; String job3 = JOB_NAME + "3"; String job4 = JOB_NAME + "4"; - long workflowExpiry = 2000; - long timeout = 5000; + long workflowExpiry = 10000; + long timeout = 8000; - JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 50); - JobConfig.Builder failedJobBuilder = createJobConfigBuilder(workflowName, true, 10); + JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 1); + JobConfig.Builder failedJobBuilder = createJobConfigBuilder(workflowName, true, 1); Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName) .setWorkflowConfig( new WorkflowConfig.Builder(workflowName) .setWorkFlowType(WORKFLOW_TYPE) - .setTimeout(timeout) + .setTimeout(timeout).setParallelJobs(4) .setFailureThreshold(1) .build() ) @@ -234,26 +234,26 @@ public class TestWorkflowTermination extends TaskTestBase { _driver.start(workflowBuilder.build()); - _driver.pollForWorkflowState(workflowName, 5000L, TaskState.FAILED); + _driver.pollForWorkflowState(workflowName, 10000L, TaskState.FAILED); // Timeout is longer than fail time, so the failover should occur earlier WorkflowContext context = _driver.getWorkflowContext(workflowName); Assert.assertTrue(context.getFinishTime() - context.getStartTime() < timeout); // job1 will complete - _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job1), 5000L, + _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job1), 10000L, TaskState.COMPLETED); // Possible race between 2 and 3 so it's likely for job2 to stay in either COMPLETED or ABORTED - _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job2), 5000L, + _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job2), 10000L, TaskState.COMPLETED, TaskState.ABORTED); // job3 meant to fail - _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job3), 5000L, + _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job3), 10000L, TaskState.FAILED); // because job4 has dependency over job3, it will fail as well - _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job4), 5000L, + _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job4), 10000L, TaskState.FAILED); // Check MBean is updated
