Author: cutting Date: Mon Jul 16 14:59:34 2007 New Revision: 556746 URL: http://svn.apache.org/viewvc?view=rev&rev=556746 Log: HADOOP-1400. Make JobClient retry requests, so that clients survive jobtracker problems. Contributed by Owen.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=556746&r1=556745&r2=556746 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon Jul 16 14:59:34 2007 @@ -367,6 +367,9 @@ This reduces the namenode's memory requirements and increases data integrity. (Raghu Angadi via cutting) +115. HADOOP-1400. Make JobClient retry requests, so that clients can + survive jobtracker problems. (omalley via cutting) + Release 0.13.0 - 2007-06-08 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=556746&r1=556745&r2=556746 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Mon Jul 16 14:59:34 2007 @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; +import org.apache.hadoop.io.retry.*; import org.apache.hadoop.ipc.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.util.*; @@ -201,32 +202,52 @@ public JobClient() { } - public JobClient(Configuration conf) throws IOException { + public JobClient(JobConf conf) throws IOException { setConf(conf); - init(); + init(conf); } - public void init() throws IOException { + public void init(JobConf conf) throws IOException { String tracker = conf.get("mapred.job.tracker", "local"); if ("local".equals(tracker)) { this.jobSubmitClient = new LocalJobRunner(conf); } else { - this.jobSubmitClient = (JobSubmissionProtocol) - RPC.getProxy(JobSubmissionProtocol.class, - JobSubmissionProtocol.versionID, - JobTracker.getAddress(conf), conf); + this.jobSubmitClient = createProxy(JobTracker.getAddress(conf), conf); } } - + /** - * Build a job client, connect to the indicated job tracker. + * Create a proxy JobSubmissionProtocol that retries timeouts. + * @param addr the address to connect to + * @param conf the server's configuration + * @return a proxy object that will retry timeouts + * @throws IOException */ - public JobClient(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { - this.jobSubmitClient = (JobSubmissionProtocol) + private JobSubmissionProtocol createProxy(InetSocketAddress addr, + Configuration conf + ) throws IOException { + JobSubmissionProtocol raw = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, - JobSubmissionProtocol.versionID, jobTrackAddr, conf); + JobSubmissionProtocol.versionID, addr, conf); + RetryPolicy backoffPolicy = + RetryPolicies.retryUpToMaximumCountWithProportionalSleep + (5, 10, java.util.concurrent.TimeUnit.SECONDS); + Map<Class<? extends Exception>, RetryPolicy> handlers = + new HashMap<Class<? extends Exception>, RetryPolicy>(); + handlers.put(SocketTimeoutException.class, backoffPolicy); + RetryPolicy backoffTimeOuts = + RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,handlers); + return (JobSubmissionProtocol) + RetryProxy.create(JobSubmissionProtocol.class, raw, backoffTimeOuts); } + /** + * Build a job client, connect to the indicated job tracker. + */ + public JobClient(InetSocketAddress jobTrackAddr, + Configuration conf) throws IOException { + jobSubmitClient = createProxy(jobTrackAddr, conf); + } /** */ @@ -270,15 +291,15 @@ // // Create a number of filenames in the JobTracker's fs namespace - Path submitJobDir = new Path(job.getSystemDir(), "submit_" + - Integer.toString(r.nextInt(Integer.MAX_VALUE), - 36)); + String jobId = jobSubmitClient.getNewJobId(); + Path submitJobDir = new Path(job.getSystemDir(), jobId); + FileSystem fs = getFs(); + LOG.debug("default FileSystem: " + fs.getUri()); + fs.delete(submitJobDir); Path submitJobFile = new Path(submitJobDir, "job.xml"); Path submitJarFile = new Path(submitJobDir, "job.jar"); Path submitSplitFile = new Path(submitJobDir, "job.split"); - FileSystem fs = getFs(); - LOG.debug("default FileSystem: " + fs.getUri()); // try getting the md5 of the archives URI[] tarchives = DistributedCache.getCacheArchives(job); URI[] tfiles = DistributedCache.getCacheFiles(job); @@ -379,7 +400,7 @@ // // Now, actually submit the job (using the submit name) // - JobStatus status = jobSubmitClient.submitJob(submitJobFile.toString()); + JobStatus status = jobSubmitClient.submitJob(jobId); if (status != null) { return new NetworkedJob(status); } else { @@ -723,9 +744,6 @@ throw new RuntimeException("JobClient:" + cmd); } - // initialize JobClient - init(); - // Process args String submitJobFile = null; String jobid = null; @@ -751,11 +769,20 @@ } } + // initialize JobClient + JobConf conf = null; + if (submitJobFile != null) { + conf = new JobConf(submitJobFile); + } else { + conf = new JobConf(); + } + init(conf); + // Submit the request int exitCode = -1; try { if (submitJobFile != null) { - RunningJob job = submitJob(submitJobFile); + RunningJob job = submitJob(conf); System.out.println("Created job " + job.getJobID()); } else if (getStatus) { RunningJob job = getJob(jobid); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=556746&r1=556745&r2=556746 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon Jul 16 14:59:34 2007 @@ -29,7 +29,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; @@ -115,28 +114,28 @@ * Create a JobInProgress with the given job file, plus a handle * to the tracker. */ - public JobInProgress(String jobFile, JobTracker jobtracker, - Configuration default_conf) throws IOException { - jobId = jobtracker.getTrackerIdentifier() + "_" +jobtracker.createJobId(); - String fullJobId = "job_" + jobId; + public JobInProgress(String jobid, JobTracker jobtracker, + JobConf default_conf) throws IOException { + this.jobId = jobid; String url = "http://" + jobtracker.getJobTrackerMachine() + ":" - + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + fullJobId; + + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid; this.jobtracker = jobtracker; - this.status = new JobStatus(fullJobId, 0.0f, 0.0f, JobStatus.PREP); + this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP); this.startTime = System.currentTimeMillis(); this.localFs = (LocalFileSystem)FileSystem.getLocal(default_conf); JobConf default_job_conf = new JobConf(default_conf); this.localJobFile = default_job_conf.getLocalPath(JobTracker.SUBDIR - +"/"+fullJobId + ".xml"); + +"/"+jobid + ".xml"); this.localJarFile = default_job_conf.getLocalPath(JobTracker.SUBDIR - +"/"+ fullJobId + ".jar"); + +"/"+ jobid + ".jar"); FileSystem fs = FileSystem.get(default_conf); - fs.copyToLocalFile(new Path(jobFile), localJobFile); + Path jobFile = new Path(default_conf.getSystemDir(), jobid + "/job.xml"); + fs.copyToLocalFile(jobFile, localJobFile); conf = new JobConf(localJobFile); this.priority = conf.getJobPriority(); - this.profile = new JobProfile(conf.getUser(), fullJobId, jobFile, url, - conf.getJobName()); + this.profile = new JobProfile(conf.getUser(), jobid, + jobFile.toString(), url, jobid); String jarFile = conf.getJar(); if (jarFile != null) { fs.copyToLocalFile(new Path(jarFile), localJarFile); @@ -151,15 +150,16 @@ this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent(); this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent(); - JobHistory.JobInfo.logSubmitted(fullJobId, conf.getJobName(), conf.getUser(), - System.currentTimeMillis(), jobFile); + JobHistory.JobInfo.logSubmitted(jobid, conf.getJobName(), conf.getUser(), + System.currentTimeMillis(), + jobFile.toString()); MetricsContext metricsContext = MetricsUtil.getContext("mapred"); this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job"); this.jobMetrics.setTag("user", conf.getUser()); this.jobMetrics.setTag("sessionId", conf.getSessionId()); this.jobMetrics.setTag("jobName", conf.getJobName()); - this.jobMetrics.setTag("jobId", fullJobId); + this.jobMetrics.setTag("jobId", jobid); } /** @@ -1076,8 +1076,8 @@ // Delete temp dfs dirs created if any, like in case of // speculative exn of reduces. - String tempDir = conf.get("mapred.system.dir") + "/job_" + jobId; - fs.delete(new Path(tempDir)); + Path tempDir = new Path(conf.getSystemDir(), jobId); + fs.delete(tempDir); } catch (IOException e) { LOG.warn("Error cleaning up "+profile.getJobId()+": "+e); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?view=diff&rev=556746&r1=556745&r2=556746 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Mon Jul 16 14:59:34 2007 @@ -33,11 +33,20 @@ *changed */ public static final long versionID = 3L; + + /** + * Allocate a name for the job. + * @return a unique job name for submitting jobs. + * @throws IOException + */ + public String getNewJobId() throws IOException; + /** * Submit a Job for execution. Returns the latest profile for * that job. + * The job files should be submitted in <b>system-dir</b>/<b>jobName</b>. */ - public JobStatus submitJob(String jobFile) throws IOException; + public JobStatus submitJob(String jobName) throws IOException; /** * Get the current status of the cluster Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=556746&r1=556745&r2=556746 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Jul 16 14:59:34 2007 @@ -99,7 +99,7 @@ * @param conf configuration for the JobTracker. * @throws IOException */ - public static void startTracker(Configuration conf) throws IOException { + public static void startTracker(JobConf conf) throws IOException { if (tracker != null) throw new IOException("JobTracker already running."); runTracker = true; @@ -604,12 +604,12 @@ static final String SUBDIR = "jobTracker"; FileSystem fs; Path systemDir; - private Configuration conf; + private JobConf conf; /** * Start the JobTracker process, listen on the indicated port */ - JobTracker(Configuration conf) throws IOException { + JobTracker(JobConf conf) throws IOException { // // Grab some static constants // @@ -1441,9 +1441,27 @@ LOG.warn("Report from " + taskTracker + ": " + errorMessage); } + /** + * Remove the job_ from jobids to get the unique string. + */ + static String getJobUniqueString(String jobid) { + return jobid.substring(4); + } + //////////////////////////////////////////////////// // JobSubmissionProtocol //////////////////////////////////////////////////// + + /** + * Allocates a new JobId string. + */ + public String getNewJobId() { + synchronized (this) { + return "job_" + getTrackerIdentifier() + "_" + + idFormat.format(nextJobId++); + } + } + /** * JobTracker.submitJob() kicks off a new job. * @@ -1677,12 +1695,6 @@ public JobInProgress getJob(String jobid) { return jobs.get(jobid); } - /** - * Grab random num for job id - */ - String createJobId() { - return idFormat.format(nextJobId++); - } //////////////////////////////////////////////////// // Methods to track all the TaskTrackers @@ -1773,8 +1785,7 @@ } try { - Configuration conf=new Configuration(); - startTracker(conf); + startTracker(new JobConf()); } catch (Throwable e) { LOG.fatal(StringUtils.stringifyException(e)); System.exit(-1); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=556746&r1=556745&r2=556746 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Mon Jul 16 14:59:34 2007 @@ -26,7 +26,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; @@ -39,7 +38,7 @@ private FileSystem fs; private HashMap<String, Job> jobs = new HashMap<String, Job>(); - private Configuration conf; + private JobConf conf; private int map_tasks = 0; private int reduce_tasks = 0; @@ -51,7 +50,7 @@ private class Job extends Thread implements TaskUmbilicalProtocol { - private String file; + private Path file; private String id; private JobConf job; private Random random = new Random(); @@ -74,18 +73,18 @@ return TaskUmbilicalProtocol.versionID; } - public Job(String file, Configuration conf) throws IOException { - this.file = file; - this.id = "job_" + newId(); + public Job(String jobid, JobConf conf) throws IOException { + this.file = new Path(conf.getSystemDir(), jobid + "/job.xml"); + this.id = jobid; this.mapoutputFile = new MapOutputFile(); this.mapoutputFile.setConf(conf); this.localFile = new JobConf(conf).getLocalPath("localRunner/"+id+".xml"); this.localFs = FileSystem.getLocal(conf); - fs.copyToLocalFile(new Path(file), localFile); + fs.copyToLocalFile(file, localFile); this.job = new JobConf(localFile); - profile = new JobProfile(job.getUser(), id, file, + profile = new JobProfile(job.getUser(), id, file.toString(), "http://localhost:8080/", job.getJobName()); status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING); @@ -119,7 +118,7 @@ splits[i].write(buffer); BytesWritable split = new BytesWritable(); split.set(buffer.getData(), 0, buffer.getLength()); - MapTask map = new MapTask(jobId, file, "tip_m_" + mapId, + MapTask map = new MapTask(jobId, file.toString(), "tip_m_" + mapId, mapId, i, splits[i].getClass().getName(), split); @@ -152,8 +151,9 @@ } { - ReduceTask reduce = new ReduceTask(jobId, file, "tip_r_0001", - reduceId, 0, mapIds.size()); + ReduceTask reduce = new ReduceTask(jobId, file.toString(), + "tip_r_0001", + reduceId, 0, mapIds.size()); JobConf localConf = new JobConf(job); reduce.localizeConfiguration(localConf); reduce.setConf(localConf); @@ -187,7 +187,7 @@ } finally { try { - fs.delete(new Path(file).getParent()); // delete submit dir + fs.delete(file.getParent()); // delete submit dir localFs.delete(localFile); // delete local copy } catch (IOException e) { LOG.warn("Error cleaning up "+id+": "+e); @@ -258,7 +258,7 @@ } - public LocalJobRunner(Configuration conf) throws IOException { + public LocalJobRunner(JobConf conf) throws IOException { this.fs = FileSystem.get(conf); this.conf = conf; myMetrics = new JobTrackerMetrics(new JobConf(conf)); @@ -266,8 +266,13 @@ // JobSubmissionProtocol methods - public JobStatus submitJob(String jobFile) throws IOException { - return new Job(jobFile, this.conf).status; + private int jobid = 0; + public String getNewJobId() { + return "job_local_" + Integer.toString(++jobid); + } + + public JobStatus submitJob(String jobid) throws IOException { + return new Job(jobid, this.conf).status; } public void killJob(String id) { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=556746&r1=556745&r2=556746 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Mon Jul 16 14:59:34 2007 @@ -114,7 +114,7 @@ /** * Constructor for MapTask */ - public TaskInProgress(String uniqueString, String jobFile, + public TaskInProgress(String jobid, String jobFile, String splitClass, BytesWritable split, JobTracker jobtracker, JobConf conf, JobInProgress job, int partition) { @@ -126,13 +126,13 @@ this.conf = conf; this.partition = partition; setMaxTaskAttempts(); - init(uniqueString); + init(JobTracker.getJobUniqueString(jobid)); } /** * Constructor for ReduceTask */ - public TaskInProgress(String uniqueString, String jobFile, + public TaskInProgress(String jobid, String jobFile, int numMaps, int partition, JobTracker jobtracker, JobConf conf, JobInProgress job) { @@ -143,7 +143,7 @@ this.job = job; this.conf = conf; setMaxTaskAttempts(); - init(uniqueString); + init(JobTracker.getJobUniqueString(jobid)); } /** * Set the max number of attempts before we declare a TIP as "failed"