Author: cutting Date: Wed Mar 15 16:01:42 2006 New Revision: 386224 URL: http://svn.apache.org/viewcvs?rev=386224&view=rev Log: Fix for HADOOP-81. Job-specific parameters should be read from the job-specific configuration, not the daemon's. This permits speculative execution, number of map & reduce tasks, etc. to be settable in the job. Contributed by Owen O'Malley.
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=386224&r1=386223&r2=386224&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Wed Mar 15 16:01:42 2006 @@ -244,6 +244,22 @@ setClass("mapred.combiner.class", theClass, Reducer.class); } + /** + * Should speculative execution be used for this job? + * @return Defaults to true + */ + public boolean getSpeculativeExecution() { + return getBoolean("mapred.speculative.execution", true); + } + + /** + * Turn on or off speculative execution for this job. + * In general, it should be turned off for map jobs that have side effects. + */ + public void setSpeculativeExecution(boolean new_val) { + setBoolean("mapred.speculative.execution", new_val); + } + public int getNumMapTasks() { return getInt("mapred.map.tasks", 1); } public void setNumMapTasks(int n) { setInt("mapred.map.tasks", n); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=386224&r1=386223&r2=386224&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Mar 15 16:01:42 2006 @@ -50,43 +50,46 @@ long finishTime; String deleteUponCompletion = null; - Configuration conf; + private JobConf conf; boolean tasksInited = false; /** * Create a JobInProgress with the given job file, plus a handle * to the tracker. */ - public JobInProgress(String jobFile, JobTracker jobtracker, Configuration conf) throws IOException { + public JobInProgress(String jobFile, JobTracker jobtracker, + Configuration default_conf) throws IOException { String jobid = "job_" + jobtracker.createUniqueId(); String url = "http://" + jobtracker.getJobTrackerMachine() + ":" + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid; - this.conf = conf; this.jobtracker = jobtracker; this.profile = new JobProfile(jobid, jobFile, url); this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP); this.startTime = System.currentTimeMillis(); - this.localJobFile = new JobConf(conf).getLocalFile(JobTracker.SUBDIR, jobid + ".xml"); - this.localJarFile = new JobConf(conf).getLocalFile(JobTracker.SUBDIR, jobid + ".jar"); - FileSystem fs = FileSystem.get(conf); + JobConf default_job_conf = new JobConf(default_conf); + this.localJobFile = default_job_conf.getLocalFile(JobTracker.SUBDIR, + jobid + ".xml"); + this.localJarFile = default_job_conf.getLocalFile(JobTracker.SUBDIR, + jobid + ".jar"); + FileSystem fs = FileSystem.get(default_conf); fs.copyToLocalFile(new File(jobFile), localJobFile); - JobConf jd = new JobConf(localJobFile); + conf = new JobConf(localJobFile); - String jarFile = jd.getJar(); + String jarFile = conf.getJar(); if (jarFile != null) { fs.copyToLocalFile(new File(jarFile), localJarFile); - jd.setJar(localJarFile.getCanonicalPath()); + conf.setJar(localJarFile.getCanonicalPath()); } - this.numMapTasks = jd.getNumMapTasks(); - this.numReduceTasks = jd.getNumReduceTasks(); + this.numMapTasks = conf.getNumMapTasks(); + this.numReduceTasks = conf.getNumReduceTasks(); // // If a jobFile is in the systemDir, we can delete it (and // its JAR) upon completion // - File systemDir = jd.getSystemDir(); + File systemDir = conf.getSystemDir(); if (jobFile.startsWith(systemDir.getPath())) { this.deleteUponCompletion = jobFile; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=386224&r1=386223&r2=386224&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Wed Mar 15 16:01:42 2006 @@ -47,37 +47,37 @@ public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.mapred.TaskInProgress"); // Defines the TIP - String jobFile = null; - FileSplit split = null; - String hints[][] = null; - TaskInProgress predecessors[] = null; - int partition; - JobTracker jobtracker; - String id; - String totalTaskIds[]; - JobInProgress job; + private String jobFile = null; + private FileSplit split = null; + private String hints[][] = null; + private TaskInProgress predecessors[] = null; + private int partition; + private JobTracker jobtracker; + private String id; + private String totalTaskIds[]; + private JobInProgress job; // Status of the TIP - int numTaskFailures = 0; - double progress = 0; - String state = ""; - long startTime = 0; - int completes = 0; - boolean failed = false; - TreeSet usableTaskIds = new TreeSet(); - TreeSet recentTasks = new TreeSet(); - Configuration conf; + private int numTaskFailures = 0; + private double progress = 0; + private String state = ""; + private long startTime = 0; + private int completes = 0; + private boolean failed = false; + private TreeSet usableTaskIds = new TreeSet(); + private TreeSet recentTasks = new TreeSet(); + private JobConf conf; - TreeMap taskDiagnosticData = new TreeMap(); - TreeMap taskStatuses = new TreeMap(); + private TreeMap taskDiagnosticData = new TreeMap(); + private TreeMap taskStatuses = new TreeMap(); - TreeSet machinesWhereFailed = new TreeSet(); - TreeSet tasksReportedClosed = new TreeSet(); + private TreeSet machinesWhereFailed = new TreeSet(); + private TreeSet tasksReportedClosed = new TreeSet(); /** * Constructor for MapTask */ - public TaskInProgress(String jobFile, FileSplit split, JobTracker jobtracker, Configuration conf, JobInProgress job) { + public TaskInProgress(String jobFile, FileSplit split, JobTracker jobtracker, JobConf conf, JobInProgress job) { this.jobFile = jobFile; this.split = split; this.jobtracker = jobtracker; @@ -89,7 +89,7 @@ /** * Constructor for ReduceTask */ - public TaskInProgress(String jobFile, TaskInProgress predecessors[], int partition, JobTracker jobtracker, Configuration conf, JobInProgress job) { + public TaskInProgress(String jobFile, TaskInProgress predecessors[], int partition, JobTracker jobtracker, JobConf conf, JobInProgress job) { this.jobFile = jobFile; this.predecessors = predecessors; this.partition = partition; @@ -408,7 +408,7 @@ // if (isMapTask() && recentTasks.size() <= MAX_TASK_EXECS && - conf.getBoolean("mapred.speculative.execution", true) && + conf.getSpeculativeExecution() && (averageProgress - progress >= SPECULATIVE_GAP) && (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)) { return true;