Repository: helix Updated Branches: refs/heads/helix-0.6.x 456ddb0c4 -> 7bbb20be6
[HELIX-614] Fix the bug when job expiry time is shorter than job schedule interval in recurring job queue. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d129d3ab Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d129d3ab Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d129d3ab Branch: refs/heads/helix-0.6.x Commit: d129d3ab780adb1ff41fe5a0bfb3dafd7d5068a3 Parents: 456ddb0 Author: Lei Xia <[email protected]> Authored: Fri Nov 20 15:38:31 2015 -0800 Committer: Lei Xia <[email protected]> Committed: Fri Nov 20 15:38:31 2015 -0800 ---------------------------------------------------------------------- .../java/org/apache/helix/task/JobConfig.java | 1 + .../org/apache/helix/task/TaskRebalancer.java | 35 +++++++++++++++++--- .../java/org/apache/helix/task/TaskUtil.java | 1 + .../java/org/apache/helix/task/Workflow.java | 17 +++++++--- .../integration/task/TestRecurringJobQueue.java | 32 +++++++++++++++--- 5 files changed, 74 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/d129d3ab/helix-core/src/main/java/org/apache/helix/task/JobConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java index 30f76b7..c7c2f38 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java @@ -217,6 +217,7 @@ public class JobConfig { cfgMap.put(JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, "" + _maxForcedReassignmentsPerTask); cfgMap.put(JobConfig.FAILURE_THRESHOLD, "" + _failureThreshold); cfgMap.put(JobConfig.DISABLE_EXTERNALVIEW, Boolean.toString(_disableExternalView)); + cfgMap.put(JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE, "" + _numConcurrentTasksPerInstance); return cfgMap; } http://git-wip-us.apache.org/repos/asf/helix/blob/d129d3ab/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index 2ff8b8c..5a86c3d 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -20,6 +20,7 @@ package org.apache.helix.task; */ import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Date; import java.util.HashMap; @@ -113,10 +114,12 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData, IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) { final String resourceName = resource.getResourceName(); + LOG.debug("Computer Best Partition for resource: " + resourceName); // Fetch job configuration JobConfig jobCfg = TaskUtil.getJobCfg(_manager, resourceName); if (jobCfg == null) { + LOG.debug("Job configuration is NULL for " + resourceName); return emptyAssignment(resourceName, currStateOutput); } String workflowResource = jobCfg.getWorkflow(); @@ -124,6 +127,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { // Fetch workflow configuration and context WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource); if (workflowCfg == null) { + LOG.debug("Workflow configuration is NULL for " + resourceName); return emptyAssignment(resourceName, currStateOutput); } WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource); @@ -132,6 +136,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { if (workflowCtx == null) { workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext")); workflowCtx.setStartTime(System.currentTimeMillis()); + LOG.info("Workflow context for " + resourceName + " created!"); } // check ancestor job status @@ -147,12 +152,16 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { } if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()) { + LOG.debug("Job is not ready to be scheduled due to pending dependent jobs " + resourceName); return emptyAssignment(resourceName, currStateOutput); } // Clean up if workflow marked for deletion TargetState targetState = workflowCfg.getTargetState(); if (targetState == TargetState.DELETE) { + LOG.info( + "Workflow is marked as deleted " + workflowResource + + " cleaning up the workflow context."); cleanup(_manager, resourceName, workflowCfg, workflowResource); return emptyAssignment(resourceName, currStateOutput); } @@ -160,6 +169,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { // Check if this workflow has been finished past its expiry. if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis()) { + LOG.info("Workflow " + workflowResource + + " is completed and passed expiry time, cleaning up the workflow context."); markForDeletion(_manager, workflowResource); cleanup(_manager, resourceName, workflowCfg, workflowResource); return emptyAssignment(resourceName, currStateOutput); @@ -176,6 +187,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { long jobFinishTime = jobCtx.getFinishTime(); if (!workflowCfg.isTerminable() && jobFinishTime != WorkflowContext.UNFINISHED && jobFinishTime + workflowCfg.getExpiry() <= System.currentTimeMillis()) { + LOG.info("Job " + resourceName + + " is completed and passed expiry time, cleaning up the job context."); cleanup(_manager, resourceName, workflowCfg, workflowResource); return emptyAssignment(resourceName, currStateOutput); } @@ -183,6 +196,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { // The job is already in a final state (completed/failed). if (workflowCtx.getJobState(resourceName) == TaskState.FAILED || workflowCtx.getJobState(resourceName) == TaskState.COMPLETED) { + LOG.debug("Job " + resourceName + " is failed or already completed."); return emptyAssignment(resourceName, currStateOutput); } @@ -190,6 +204,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { boolean isReady = scheduleIfNotReady(workflowCfg, workflowCtx, workflowResource, resourceName, clusterData); if (!isReady) { + LOG.debug("Job " + resourceName + " is not ready to be scheduled."); return emptyAssignment(resourceName, currStateOutput); } @@ -224,6 +239,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx); TaskUtil.setPrevResourceAssignment(_manager, resourceName, newAssignment); + LOG.debug("Job " + resourceName + " new assignment " + Arrays + .toString(newAssignment.getMappedPartitions().toArray())); + return newAssignment; } @@ -529,6 +547,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { // Remove any timers that are past-time for this workflow Date scheduledTime = SCHEDULED_TIMES.get(workflowResource); if (scheduledTime != null && currentTime > scheduledTime.getTime()) { + LOG.debug("Remove schedule timer for " + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource)); SCHEDULED_TIMES.remove(workflowResource); } @@ -536,6 +555,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { if (scheduleConfig.isRecurring()) { // Skip scheduling this workflow if it's not in a start state if (!workflowCfg.getTargetState().equals(TargetState.START)) { + LOG.debug( + "Skip scheduling since the workflow has not been started " + workflowResource); return false; } @@ -543,8 +564,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow(); if (lastScheduled != null) { WorkflowContext lastWorkflowCtx = TaskUtil.getWorkflowContext(_manager, lastScheduled); - if (lastWorkflowCtx == null - || lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) { + if (lastWorkflowCtx != null + && lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) { + LOG.info("Skip scheduling since last schedule has not completed yet " + lastScheduled); return false; } } @@ -559,7 +581,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { // Now clone the workflow if this clone has not yet been created String newWorkflowName = workflowResource + "_" + TaskConstants.SCHEDULED + "_" + offsetMultiplier; - if (lastScheduled == null || !lastScheduled.equals(newWorkflowName)) { + LOG.debug("Ready to start workflow " + newWorkflowName); + if (!newWorkflowName.equals(lastScheduled)) { Workflow clonedWf = TaskUtil.cloneWorkflow(_manager, workflowResource, newWorkflowName, new Date( timeToSchedule)); @@ -592,9 +615,12 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { // Do nothing if there is already a timer set for the this workflow with the same start time. if ((SCHEDULED_TIMES.containsKey(id) && SCHEDULED_TIMES.get(id).equals(startTime)) || SCHEDULED_TIMES.inverse().containsKey(startTime)) { + LOG.debug("Schedule timer for" + id + "and job: " + jobResource + " is up to date."); return; } - LOG.info("Schedule rebalance with id: " + id + "and job: " + jobResource); + LOG.info( + "Schedule rebalance with id: " + id + "and job: " + jobResource + " at time: " + startTime + + " delay from start: " + delayFromStart); // For workflows not yet scheduled, schedule them and record it RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, jobResource); @@ -607,6 +633,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { long currentTime = now; Date scheduledTime = SCHEDULED_TIMES.get(jobResource); if (scheduledTime != null && currentTime > scheduledTime.getTime()) { + LOG.debug("Remove schedule timer for" + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource)); SCHEDULED_TIMES.remove(jobResource); } http://git-wip-us.apache.org/repos/asf/helix/blob/d129d3ab/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index 2235b80..bb62de5 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -349,6 +349,7 @@ public class TaskUtil { IdealState is = accessor.getProperty(key); if (is != null) { accessor.updateProperty(key, is); + LOG.debug("invoke rebalance for " + key); } } http://git-wip-us.apache.org/repos/asf/helix/blob/d129d3ab/helix-core/src/main/java/org/apache/helix/task/Workflow.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java index 4ca6e68..f69605e 100644 --- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java +++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java @@ -26,8 +26,10 @@ import java.io.Reader; import java.io.StringReader; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import org.apache.helix.task.beans.JobBean; @@ -192,10 +194,17 @@ public class Workflow { */ public void validate() { // validate dag and configs - if (!_jobConfigs.keySet().containsAll(_workflowConfig.getJobDag().getAllNodes())) { - throw new IllegalArgumentException("Nodes specified in DAG missing from config"); - } else if (!_workflowConfig.getJobDag().getAllNodes().containsAll(_jobConfigs.keySet())) { - throw new IllegalArgumentException("Given DAG lacks nodes with supplied configs"); + Set<String> jobNamesInConfig = new HashSet<String>(_jobConfigs.keySet()); + Set<String> jobNamesInDag = new HashSet<String>(_workflowConfig.getJobDag().getAllNodes()); + if (!jobNamesInConfig.equals(jobNamesInDag)) { + Set<String> jobNamesInConfigButNotInDag = new HashSet<String>(jobNamesInConfig); + jobNamesInConfigButNotInDag.removeAll(jobNamesInDag); + Set<String> jobNamesInDagButNotInConfig = new HashSet<String>(jobNamesInDag); + jobNamesInDagButNotInConfig.removeAll(jobNamesInConfig); + + throw new IllegalArgumentException( + "Job Names dismatch. Names in config but not in dag: " + jobNamesInConfigButNotInDag + + ", names in dag but not in config: " + jobNamesInDagButNotInConfig); } _workflowConfig.getJobDag().validate(); http://git-wip-us.apache.org/repos/asf/helix/blob/d129d3ab/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java index 4656a23..deca8a7 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java @@ -22,6 +22,7 @@ package org.apache.helix.integration.task; import java.util.ArrayList; import java.util.Calendar; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map;; @@ -157,10 +158,31 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { _manager.disconnect(); } + private Date getDateFromStartTime(String startTime) + { + int splitIndex = startTime.indexOf(':'); + int hourOfDay = 0, minutes = 0; + try + { + hourOfDay = Integer.parseInt(startTime.substring(0, splitIndex)); + minutes = Integer.parseInt(startTime.substring(splitIndex + 1)); + } + catch (NumberFormatException e) + { + + } + Calendar cal = Calendar.getInstance(); + cal.set(Calendar.HOUR_OF_DAY, hourOfDay); + cal.set(Calendar.MINUTE, minutes); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + return cal.getTime(); + } + private JobQueue buildRecurrentJobQueue(String jobQueueName, int delayStart) { Map<String, String> cfgMap = new HashMap<String, String>(); - cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(500000)); - cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(120)); + cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000)); + cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(60)); cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, "SECONDS"); Calendar cal = Calendar.getInstance(); cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60); @@ -168,6 +190,8 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { cal.set(Calendar.MILLISECOND, 0); cfgMap.put(WorkflowConfig.START_TIME, WorkflowConfig.getDefaultDateFormat().format(cal.getTime())); + //cfgMap.put(WorkflowConfig.START_TIME, + //WorkflowConfig.getDefaultDateFormat().format(getDateFromStartTime("00:00"))); return (new JobQueue.Builder(jobQueueName).fromMap(cfgMap)).build(); } @@ -186,7 +210,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { // Create and Enqueue jobs List<String> currentJobNames = new ArrayList<String>(); - for (int i = 0; i <= 2; i++) { + for (int i = 0; i <= 1; i++) { String targetPartition = (i == 0) ? "MASTER" : "SLAVE"; JobConfig.Builder job = @@ -213,7 +237,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { queue = buildRecurrentJobQueue(queueName, 5); _driver.createQueue(queue); currentJobNames.clear(); - for (int i = 0; i <= 2; i++) { + for (int i = 0; i <= 1; i++) { String targetPartition = (i == 0) ? "MASTER" : "SLAVE"; JobConfig.Builder job =
