Author: tomwhite Date: Thu Jul 12 14:11:30 2007 New Revision: 555770 URL: http://svn.apache.org/viewvc?view=rev&rev=555770 Log: HADOOP-1433. Add job priority. Contributed by Johan Oskarsson.
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobPriority.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=555770&r1=555769&r2=555770 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu Jul 12 14:11:30 2007 @@ -346,6 +346,8 @@ 107. HADOOP-1570. Permit jobs to enable and disable the use of hadoop's native library. (Arun C Murthy via cutting) +108. HADOOP-1433. Add job priority. (Johan Oskarsson via tomwhite) + Release 0.13.0 - 2007-06-08 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?view=diff&rev=555770&r1=555769&r2=555770 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Thu Jul 12 14:11:30 2007 @@ -686,6 +686,28 @@ setInt("mapred.max.reduce.failures.percent", percent); } + /** + * Set job priority for this job. + * + * @param prio + */ + public void setJobPriority(JobPriority prio) { + set("mapred.job.priority", prio.toString()); + } + + /** + * Get the job priority for this job. + */ + public JobPriority getJobPriority() { + String prio = get("mapred.job.priority"); + if(prio == null) { + return JobPriority.NORMAL; + } + + return JobPriority.valueOf(prio); + } + + /** Find a jar that contains a class of the same name, if any. * It will return a jar file, even if that is not the first thing * on the class path that has a class with the same name. Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=555770&r1=555769&r2=555770 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Thu Jul 12 14:11:30 2007 @@ -69,7 +69,8 @@ int reduceFailuresPercent = 0; int failedMapTIPs = 0; int failedReduceTIPs = 0; - + + JobPriority priority = JobPriority.NORMAL; JobTracker jobtracker = null; Map<String,List<TaskInProgress>> hostToMaps = new HashMap<String,List<TaskInProgress>>(); @@ -133,6 +134,7 @@ FileSystem fs = FileSystem.get(default_conf); fs.copyToLocalFile(new Path(jobFile), localJobFile); conf = new JobConf(localJobFile); + this.priority = conf.getJobPriority(); this.profile = new JobProfile(conf.getUser(), fullJobId, jobFile, url, conf.getJobName()); String jarFile = conf.getJar(); @@ -298,6 +300,16 @@ } public synchronized int finishedReduces() { return finishedReduceTasks; + } + public JobPriority getPriority() { + return this.priority; + } + public void setPriority(JobPriority priority) { + if(priority == null) { + this.priority = JobPriority.NORMAL; + } else { + this.priority = priority; + } } /** Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobPriority.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobPriority.java?view=auto&rev=555770 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobPriority.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobPriority.java Thu Jul 12 14:11:30 2007 @@ -0,0 +1,15 @@ +package org.apache.hadoop.mapred; + +/** + * Used to describe the priority of the running job. + * + */ +public enum JobPriority { + + VERY_HIGH, + HIGH, + NORMAL, + LOW, + VERY_LOW; + +} \ No newline at end of file Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=555770&r1=555769&r2=555770 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Thu Jul 12 14:11:30 2007 @@ -24,6 +24,7 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashSet; @@ -335,8 +336,8 @@ List<JobInProgress> retiredJobs = new ArrayList<JobInProgress>(); long retireBefore = System.currentTimeMillis() - RETIRE_JOB_INTERVAL; - synchronized (jobsByArrival) { - for(JobInProgress job: jobsByArrival) { + synchronized (jobsByPriority) { + for(JobInProgress job: jobsByPriority) { if (job.getStatus().getRunState() != JobStatus.RUNNING && job.getStatus().getRunState() != JobStatus.PREP && (job.getFinishTime() < retireBefore)) { @@ -347,13 +348,13 @@ if (!retiredJobs.isEmpty()) { synchronized (JobTracker.this) { synchronized (jobs) { - synchronized (jobsByArrival) { + synchronized (jobsByPriority) { synchronized (jobInitQueue) { for (JobInProgress job: retiredJobs) { removeJobTasks(job); jobs.remove(job.getProfile().getJobId()); jobInitQueue.remove(job); - jobsByArrival.remove(job); + jobsByPriority.remove(job); String jobUser = job.getProfile().getUser(); synchronized (userToJobsMap) { ArrayList<JobInProgress> userJobs = @@ -518,7 +519,7 @@ // All the known jobs. (jobid->JobInProgress) Map<String, JobInProgress> jobs = new TreeMap<String, JobInProgress>(); - List<JobInProgress> jobsByArrival = new ArrayList<JobInProgress>(); + List<JobInProgress> jobsByPriority = new ArrayList<JobInProgress>(); // (user -> list of JobInProgress) TreeMap<String, ArrayList<JobInProgress>> userToJobsMap = @@ -904,7 +905,7 @@ // in memory; information about the purged jobs is available via // JobHistory. synchronized (jobs) { - synchronized (jobsByArrival) { + synchronized (jobsByPriority) { synchronized (jobInitQueue) { synchronized (userToJobsMap) { String jobUser = job.getProfile().getUser(); @@ -945,7 +946,7 @@ userJobs.remove(0); jobs.remove(rjob.getProfile().getJobId()); jobInitQueue.remove(rjob); - jobsByArrival.remove(rjob); + jobsByPriority.remove(rjob); LOG.info("Retired job with id: '" + rjob.getProfile().getJobId() + "' of user: '" + @@ -1261,8 +1262,8 @@ } int totalCapacity = numTaskTrackers * maxCurrentTasks; - synchronized(jobsByArrival){ - for (Iterator it = jobsByArrival.iterator(); it.hasNext();) { + synchronized(jobsByPriority){ + for (Iterator it = jobsByPriority.iterator(); it.hasNext();) { JobInProgress job = (JobInProgress) it.next(); if (job.getStatus().getRunState() == JobStatus.RUNNING) { int totalMapTasks = job.desiredMaps(); @@ -1306,11 +1307,11 @@ // task. // - synchronized (jobsByArrival) { + synchronized (jobsByPriority) { if (numMaps < maxMapLoad) { int totalNeededMaps = 0; - for (Iterator it = jobsByArrival.iterator(); it.hasNext();) { + for (Iterator it = jobsByPriority.iterator(); it.hasNext();) { JobInProgress job = (JobInProgress) it.next(); if (job.getStatus().getRunState() != JobStatus.RUNNING) { continue; @@ -1346,7 +1347,7 @@ if (numReduces < maxReduceLoad) { int totalNeededReduces = 0; - for (Iterator it = jobsByArrival.iterator(); it.hasNext();) { + for (Iterator it = jobsByPriority.iterator(); it.hasNext();) { JobInProgress job = (JobInProgress) it.next(); if (job.getStatus().getRunState() != JobStatus.RUNNING || job.numReduceTasks == 0) { @@ -1453,17 +1454,44 @@ totalSubmissions++; JobInProgress job = new JobInProgress(jobFile, this, this.conf); synchronized (jobs) { - synchronized (jobsByArrival) { + synchronized (jobsByPriority) { synchronized (jobInitQueue) { jobs.put(job.getProfile().getJobId(), job); - jobsByArrival.add(job); + jobsByPriority.add(job); jobInitQueue.add(job); + resortPriority(); jobInitQueue.notifyAll(); } } } myMetrics.submitJob(); return job.getStatus(); + } + + /** + * Sort jobs by priority and then by start time. + */ + public void resortPriority() { + Comparator<JobInProgress> comp = new Comparator<JobInProgress>() { + public int compare(JobInProgress o1, JobInProgress o2) { + int res = o1.getPriority().compareTo(o2.getPriority()); + if(res == 0) { + if(o1.getStartTime() < o2.getStartTime()) + res = -1; + else + res = (o1.getStartTime()==o2.getStartTime() ? 0 : 1); + } + + return res; + } + }; + + synchronized(jobsByPriority) { + Collections.sort(jobsByPriority, comp); + } + synchronized (jobInitQueue) { + Collections.sort(jobInitQueue, comp); + } } public synchronized ClusterStatus getClusterStatus() { Modified: lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp?view=diff&rev=555770&r1=555769&r2=555770 ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp Thu Jul 12 14:11:30 2007 @@ -102,8 +102,14 @@ JobInProgress job = (JobInProgress) tracker.getJob(jobId); + String action = request.getParameter("action"); + if("changeprio".equalsIgnoreCase(action)) { + job.setPriority(JobPriority.valueOf(request.getParameter("prio"))); + tracker.resortPriority(); + } + if(JspHelper.conf.getBoolean(PRIVATE_ACTIONS_KEY, false)) { - String action = request.getParameter("action"); + action = request.getParameter("action"); if(action!=null && action.equalsIgnoreCase("confirm")) { printConfirm(out, jobId); return; @@ -234,9 +240,20 @@ </table> +<hr>Change priority from <%=job.getPriority()%> to: +<% + JobPriority jobPrio = job.getPriority(); + for (JobPriority prio : JobPriority.values()) { + if(jobPrio != prio) { + %><a href="jobdetails.jsp?action=changeprio&jobid=<%=jobId%>&prio=<%=prio%>"> <%=prio%> </a><% + } + } +%> +</br> + <% if(JspHelper.conf.getBoolean(PRIVATE_ACTIONS_KEY, false) && runState == JobStatus.RUNNING) { %> - <hr><a href="jobdetails.jsp?action=confirm&jobid=<%=jobId%>"> Kill this job </a> + <br/><a href="jobdetails.jsp?action=confirm&jobid=<%=jobId%>"> Kill this job </a> <% } %> <hr> <a href="jobtracker.jsp">Go back to JobTracker</a><br>